diff --git a/lib/xtb_client/application.ex b/lib/xtb_client/application.ex new file mode 100644 index 0000000..ace65a6 --- /dev/null +++ b/lib/xtb_client/application.ex @@ -0,0 +1,20 @@ +defmodule XtbClient.Application do + # See https://hexdocs.pm/elixir/Application.html + # for more information on OTP Applications + @moduledoc false + + use Application + + @impl true + def start(_type, _args) do + children = [ + # Starts a worker by calling: XtbClient.Worker.start_link(arg) + # {XtbClient.Worker, arg} + ] + + # See https://hexdocs.pm/elixir/Supervisor.html + # for other strategies and supported options + opts = [strategy: :one_for_one, name: XtbClient.Supervisor] + Supervisor.start_link(children, opts) + end +end diff --git a/lib/xtb_client/connection.ex b/lib/xtb_client/connection.ex new file mode 100644 index 0000000..2ea4d05 --- /dev/null +++ b/lib/xtb_client/connection.ex @@ -0,0 +1,615 @@ +defmodule XtbClient.Connection do + @moduledoc """ + `GenServer` which handles all commands and queries issued to XTB platform. + + Acts as a proxy process between the client and the underlying main and streaming socket. + + After successful initialization the process should hold references to both `XtbClient.MainSocket` and `XtbClient.StreamingSocket` processes, + so it can mediate all commands and queries from the caller to the connected socket. + + For the synchronous operations clients should expect to get results as the function returned value. + + ## Example of synchronous call + + ``` + params = %{app_name: "XtbClient", type: :demo, url: "wss://ws.xtb.com", user: "<>", password: "<>"} + {:ok, pid} = XtbClient.Connection.start_link(params) + + version = XtbClient.Connection.get_version(pid) + # expect to see the actual version of the backend server + ``` + + Asynchronous operations, mainly `subscribe_` functions, returns immediately and stores the `pid` of the subscriber, so later it can send the message there. + Note that each `subscribe_` function expects the `subscriber` as an argument, so `XtbClient.Connection` could serve different types + of events to different subscribers. Only limitation is that each `subscribe_` function handles only one subscriber. + + ## Example of asynchronous call + + ``` + defmodule StreamListener do + use GenServer + + def start_link(args) do + name = Map.get(args, "name") |> String.to_atom() + GenServer.start_link(__MODULE__, args, name: name) + end + + @impl true + def init(state) do + {:ok, state} + end + + @impl true + def handle_info(message, state) do + IO.inspect({self(), message}, label: "Listener handle info") + {:noreply, state} + end + end + + + params = %{app_name: "XtbClient", type: :demo, url: "wss://ws.xtb.com", user: "<>", password: "<>"} + {:ok, cpid} = XtbClient.Connection.start_link(params) + + args = %{symbol: "LITECOIN"} + query = XtbClient.Messages.Quotations.Query.new(args) + {:ok, lpid} = StreamListener.start_link(%{"name" => args.symbol}) + XtbClient.Connection.subscribe_get_tick_prices(cpid, lpid, query) + # expect to see logs from StreamListener process with tick pricess logged + ``` + """ + use GenServer + + alias XtbClient.{MainSocket, StreamingSocket, StreamingMessage} + + alias XtbClient.Messages.{ + Candle, + Candles, + ChartLast, + ChartRange, + DateRange, + ProfitCalculation, + RateInfos, + Quotations, + SymbolInfo, + SymbolVolume, + TickPrices, + TradeInfos, + Trades, + TradeTransaction, + TradeTransactionStatus, + TradingHours + } + + require Logger + + defmodule State do + @moduledoc false + @enforce_keys [ + :type, + :url, + :clients, + :subscribers + ] + defstruct type: nil, + url: nil, + mpid: nil, + spid: nil, + clients: %{}, + subscribers: %{}, + stream_session_id: nil + end + + @doc """ + Starts a `XtbClient.Connection` process linked to the calling process. + """ + @spec start_link(any(), GenServer.options()) :: GenServer.on_start() + def start_link(_args, opts), do: start_link(opts) + + @doc """ + Starts a `XtbClient.Connection` process linked to the calling process. + """ + @spec start_link(GenServer.options()) :: GenServer.on_start() + def start_link(opts) do + {connection_opts, conn_opts} = Keyword.split(opts, [:connection]) + init_opts = Keyword.fetch!(connection_opts, :connection) + GenServer.start_link(__MODULE__, init_opts, conn_opts) + end + + @impl true + def init(opts) do + {:ok, mpid} = MainSocket.start_link(opts) + Process.flag(:trap_exit, true) + + Process.sleep(500) + MainSocket.stream_session_id(mpid, self()) + + type = get_in(opts, [:type]) + url = get_in(opts, [:url]) + + state = %State{ + mpid: mpid, + spid: nil, + type: type, + url: url, + clients: %{}, + subscribers: %{} + } + + {:ok, state} + end + + @doc """ + Returns array of all symbols available for the user. + """ + @spec get_all_symbols(GenServer.server()) :: XtbClient.Messages.SymbolInfos.t() + def get_all_symbols(pid) do + GenServer.call(pid, {"getAllSymbols"}) + end + + @doc """ + Returns calendar with market events. + """ + @spec get_calendar(GenServer.server()) :: XtbClient.Messages.CalendarInfos.t() + def get_calendar(pid) do + GenServer.call(pid, {"getCalendar"}) + end + + @doc """ + Returns chart info from start date to the current time. + + If the chosen period of `XtbClient.Messages.ChartLast.Query` is greater than 1 minute, the last candle returned by the API can change until the end of the period (the candle is being automatically updated every minute). + + Limitations: there are limitations in charts data availability. Detailed ranges for charts data, what can be accessed with specific period, are as follows: + + - PERIOD_M1 --- <0-1) month, i.e. one month time + - PERIOD_M30 --- <1-7) month, six months time + - PERIOD_H4 --- <7-13) month, six months time + - PERIOD_D1 --- 13 month, and earlier on + + Note, that specific PERIOD_ is the lowest (i.e. the most detailed) period, accessible in listed range. For instance, in months range <1-7) you can access periods: PERIOD_M30, PERIOD_H1, PERIOD_H4, PERIOD_D1, PERIOD_W1, PERIOD_MN1. + Specific data ranges availability is guaranteed, however those ranges may be wider, e.g.: PERIOD_M1 may be accessible for 1.5 months back from now, where 1.0 months is guaranteed. + + ## Example scenario: + + * request charts of 5 minutes period, for 3 months time span, back from now; + * response: you are guaranteed to get 1 month of 5 minutes charts; because, 5 minutes period charts are not accessible 2 months and 3 months back from now + + **Please note that this function can be usually replaced by its streaming equivalent `subscribe_get_candles/3` which is the preferred way of retrieving current candle data.** + """ + @spec get_chart_last( + GenServer.server(), + XtbClient.Messages.ChartLast.Query.t() + ) :: XtbClient.Messages.RateInfos.t() + def get_chart_last(pid, %ChartLast.Query{} = params) do + GenServer.call(pid, {"getChartLastRequest", %{info: params}}) + end + + @doc """ + Returns chart info with data between given start and end dates. + + Limitations: there are limitations in charts data availability. Detailed ranges for charts data, what can be accessed with specific period, are as follows: + + - PERIOD_M1 --- <0-1) month, i.e. one month time + - PERIOD_M30 --- <1-7) month, six months time + - PERIOD_H4 --- <7-13) month, six months time + - PERIOD_D1 --- 13 month, and earlier on + + Note, that specific PERIOD_ is the lowest (i.e. the most detailed) period, accessible in listed range. For instance, in months range <1-7) you can access periods: PERIOD_M30, PERIOD_H1, PERIOD_H4, PERIOD_D1, PERIOD_W1, PERIOD_MN1. + Specific data ranges availability is guaranteed, however those ranges may be wider, e.g.: PERIOD_M1 may be accessible for 1.5 months back from now, where 1.0 months is guaranteed. + + **Please note that this function can be usually replaced by its streaming equivalent `subscribe_get_candles/3` which is the preferred way of retrieving current candle data.** + """ + @spec get_chart_range(GenServer.server(), XtbClient.Messages.ChartRange.Query.t()) :: + XtbClient.Messages.RateInfos.t() + def get_chart_range(pid, %ChartRange.Query{} = params) do + GenServer.call(pid, {"getChartRangeRequest", %{info: params}}) + end + + @doc """ + Returns calculation of commission and rate of exchange. + + The value is calculated as expected value and therefore might not be perfectly accurate. + """ + @spec get_commission_def(GenServer.server(), XtbClient.Messages.SymbolVolume.t()) :: + XtbClient.Messages.CommissionDefinition.t() + def get_commission_def(pid, %SymbolVolume{} = params) do + GenServer.call(pid, {"getCommissionDef", params}) + end + + @doc """ + Returns information about account currency and account leverage. + """ + @spec get_current_user_data(GenServer.server()) :: XtbClient.Messages.UserInfo.t() + def get_current_user_data(pid) do + GenServer.call(pid, {"getCurrentUserData"}) + end + + @doc """ + Returns IBs data from the given time range. + """ + @spec get_ibs_history(GenServer.server(), XtbClient.Messages.DateRange.t()) :: any() + def get_ibs_history(pid, %DateRange{} = params) do + GenServer.call(pid, {"getIbsHistory", params}) + end + + @doc """ + Returns various account indicators. + + **Please note that this function can be usually replaced by its streaming equivalent `subscribe_get_balance/2` which is the preferred way of retrieving current account indicators.** + """ + @spec get_margin_level(GenServer.server()) :: XtbClient.Messages.BalanceInfo.t() + def get_margin_level(pid) do + GenServer.call(pid, {"getMarginLevel"}) + end + + @doc """ + Returns expected margin for given instrument and volume. + + The value is calculated as expected margin value and therefore might not be perfectly accurate. + """ + @spec get_margin_trade(GenServer.server(), XtbClient.Messages.SymbolVolume.t()) :: + XtbClient.Messages.MarginTrade.t() + def get_margin_trade(pid, %SymbolVolume{} = params) do + GenServer.call(pid, {"getMarginTrade", params}) + end + + @doc """ + Returns news from trading server which were sent within specified period of time. + + **Please note that this function can be usually replaced by its streaming equivalent `subscribe_get_news/2` which is the preferred way of retrieving news data.** + """ + @spec get_news(GenServer.server(), XtbClient.Messages.DateRange.t()) :: + XtbClient.Messages.NewsInfos.t() + def get_news(pid, %DateRange{} = params) do + GenServer.call(pid, {"getNews", params}) + end + + @doc """ + Calculates estimated profit for given deal data. + + Should be used for calculator-like apps only. + Profit for opened transactions should be taken from server, due to higher precision of server calculation. + """ + @spec get_profit_calculation(GenServer.server(), XtbClient.Messages.ProfitCalculation.Query.t()) :: + XtbClient.Messages.ProfitCalculation.t() + def get_profit_calculation(pid, %ProfitCalculation.Query{} = params) do + GenServer.call(pid, {"getProfitCalculation", params}) + end + + @doc """ + Returns current time on trading server. + """ + @spec get_server_time(GenServer.server()) :: XtbClient.Messages.ServerTime.t() + def get_server_time(pid) do + GenServer.call(pid, {"getServerTime"}) + end + + @doc """ + Returns a list of step rules for DMAs. + """ + @spec get_step_rules(GenServer.server()) :: XtbClient.Messages.StepRules.t() + def get_step_rules(pid) do + GenServer.call(pid, {"getStepRules"}) + end + + @doc """ + Returns information about symbol available for the user. + """ + @spec get_symbol(GenServer.server(), XtbClient.Messages.SymbolInfo.Query.t()) :: + XtbClient.Messages.SymbolInfo.t() + def get_symbol(pid, %SymbolInfo.Query{} = params) do + GenServer.call(pid, {"getSymbol", params}) + end + + @doc """ + Returns array of current quotations for given symbols, only quotations that changed from given timestamp are returned. + + New timestamp obtained from output will be used as an argument of the next call of this command. + + **Please note that this function can be usually replaced by its streaming equivalent `subscribe_get_tick_prices/3` which is the preferred way of retrieving ticks data.** + """ + @spec get_tick_prices(GenServer.server(), XtbClient.Messages.TickPrices.Query.t()) :: + XtbClient.Messages.TickPrices.t() + def get_tick_prices(pid, %TickPrices.Query{} = params) do + GenServer.call(pid, {"getTickPrices", params}) + end + + @doc """ + Returns array of trades listed in orders query. + """ + @spec get_trade_records(GenServer.server(), XtbClient.Messages.TradeInfos.Query.t()) :: + XtbClient.Messages.TradeInfos.t() + def get_trade_records(pid, %TradeInfos.Query{} = params) do + GenServer.call(pid, {"getTradeRecords", params}) + end + + @doc """ + Returns array of user's trades. + + **Please note that this function can be usually replaced by its streaming equivalent `subscribe_get_trades/2` which is the preferred way of retrieving trades data.** + """ + @spec get_trades(GenServer.server(), XtbClient.Messages.Trades.Query.t()) :: + XtbClient.Messages.TradeInfos.t() + def get_trades(pid, %Trades.Query{} = params) do + GenServer.call(pid, {"getTrades", params}) + end + + @doc """ + Returns array of user's trades which were closed within specified period of time. + """ + @spec get_trades_history(GenServer.server(), XtbClient.Messages.DateRange.t()) :: + XtbClient.Messages.TradeInfos.t() + def get_trades_history(pid, %DateRange{} = params) do + GenServer.call(pid, {"getTradesHistory", params}) + end + + @doc """ + Returns quotes and trading times. + """ + @spec get_trading_hours(GenServer.server(), XtbClient.Messages.TradingHours.Query.t()) :: + XtbClient.Messages.TradingHours.t() + def get_trading_hours(pid, %TradingHours.Query{} = params) do + GenServer.call(pid, {"getTradingHours", params}) + end + + @doc """ + Returns the current API version. + """ + @spec get_version(GenServer.server()) :: XtbClient.Messages.Version.t() + def get_version(pid) do + GenServer.call(pid, {"getVersion"}) + end + + @doc """ + Starts trade transaction. + + `trade_transaction/2` sends main transaction information to the server. + + ## How to verify that the trade request was accepted? + The status field set to 'true' does not imply that the transaction was accepted. It only means, that the server acquired your request and began to process it. + To analyse the status of the transaction (for example to verify if it was accepted or rejected) use the `trade_transaction_status/2` command with the order number that came back with the response of the `trade_transaction/2` command. + """ + @spec trade_transaction(GenServer.server(), XtbClient.Messages.TradeTransaction.Command.t()) :: + XtbClient.Messages.TradeTransaction.t() + def trade_transaction(pid, %TradeTransaction.Command{} = params) do + GenServer.call(pid, {"tradeTransaction", %{tradeTransInfo: params}}) + end + + @doc """ + Returns current transaction status. + + At any time of transaction processing client might check the status of transaction on server side. + In order to do that client must provide unique order taken from `trade_transaction/2` invocation. + """ + @spec trade_transaction_status( + GenServer.server(), + XtbClient.Messages.TradeTransactionStatus.Query.t() + ) :: + XtbClient.Messages.TradeTransactionStatus.t() + def trade_transaction_status(pid, %TradeTransactionStatus.Query{} = params) do + GenServer.call(pid, {"tradeTransactionStatus", params}) + end + + @doc """ + Allows to get actual account indicators values in real-time, as soon as they are available in the system. + + Operation is asynchronous, so the immediate response is an `:ok` atom. + When the new data are available, the `XtbClient.Messages.BalanceInfo` struct is sent to the `subscriber` process. + """ + @spec subscribe_get_balance(GenServer.server(), GenServer.server()) :: :ok + def subscribe_get_balance(pid, subscriber) do + GenServer.cast(pid, {:subscribe, {subscriber, StreamingMessage.new("getBalance", "balance")}}) + end + + @doc """ + Subscribes for API chart candles. + The interval of every candle is 1 minute. A new candle arrives every minute. + + Operation is asynchronous, so the immediate response is an `:ok` atom. + When the new data are available, the `XtbClient.Messages.Candle` struct is sent to the `subscriber` process. + """ + @spec subscribe_get_candles( + GenServer.server(), + GenServer.server(), + XtbClient.Messages.Candles.Query.t() + ) :: :ok + def subscribe_get_candles(pid, subscriber, %Candles.Query{} = params) do + GenServer.cast( + pid, + {:subscribe, {subscriber, StreamingMessage.new("getCandles", "candle", params)}} + ) + end + + @doc """ + Subscribes for 'keep alive' messages. + A new 'keep alive' message is sent by the API every 3 seconds. + + Operation is asynchronous, so the immediate response is an `:ok` atom. + When the new data are available, the `XtbClient.Messages.KeepAlive` struct is sent to the `subscriber` process. + """ + @spec subscribe_keep_alive(GenServer.server(), GenServer.server()) :: :ok + def subscribe_keep_alive(pid, subscriber) do + GenServer.cast( + pid, + {:subscribe, {subscriber, StreamingMessage.new("getKeepAlive", "keepAlive")}} + ) + end + + @doc """ + Subscribes for news. + + Operation is asynchronous, so the immediate response is an `:ok` atom. + When the new data are available, the `XtbClient.Messages.NewsInfo` struct is sent to the `subscriber` process. + """ + @spec subscribe_get_news(GenServer.server(), GenServer.server()) :: :ok + def subscribe_get_news(pid, subscriber) do + GenServer.cast(pid, {:subscribe, {subscriber, StreamingMessage.new("getNews", "news")}}) + end + + @doc """ + Subscribes for profits. + + Operation is asynchronous, so the immediate response is an `:ok` atom. + When the new data are available, the `XtbClient.Messages.ProfitInfo` struct is sent to the `subscriber` process. + """ + @spec subscribe_get_profits(GenServer.server(), GenServer.server()) :: :ok + def subscribe_get_profits(pid, subscriber) do + GenServer.cast(pid, {:subscribe, {subscriber, StreamingMessage.new("getProfits", "profit")}}) + end + + @doc """ + Establishes subscription for quotations and allows to obtain the relevant information in real-time, as soon as it is available in the system. + The `subscribe_get_tick_prices/3` command can be invoked many times for the same symbol, but only one subscription for a given symbol will be created. + Please beware that when multiple records are available, the order in which they are received is not guaranteed. + + Operation is asynchronous, so the immediate response is an `:ok` atom. + When the new data are available, the `XtbClient.Messages.TickPrice` struct is sent to the `subscriber` process. + """ + @spec subscribe_get_tick_prices( + GenServer.server(), + GenServer.server(), + XtbClient.Messages.Quotations.Query.t() + ) :: + :ok + def subscribe_get_tick_prices(pid, subscriber, %Quotations.Query{} = params) do + GenServer.cast( + pid, + {:subscribe, {subscriber, StreamingMessage.new("getTickPrices", "tickPrices", params)}} + ) + end + + @doc """ + Establishes subscription for user trade status data and allows to obtain the relevant information in real-time, as soon as it is available in the system. + Please beware that when multiple records are available, the order in which they are received is not guaranteed. + + Operation is asynchronous, so the immediate response is an `:ok` atom. + When the new data are available, the `XtbClient.Messages.TradeInfo` struct is sent to the `subscriber` process. + """ + @spec subscribe_get_trades(GenServer.server(), GenServer.server()) :: :ok + def subscribe_get_trades(pid, subscriber) do + GenServer.cast(pid, {:subscribe, {subscriber, StreamingMessage.new("getTrades", "trade")}}) + end + + @doc """ + Allows to get status for sent trade requests in real-time, as soon as it is available in the system. + Please beware that when multiple records are available, the order in which they are received is not guaranteed. + + Operation is asynchronous, so the immediate response is an `:ok` atom. + When the new data are available, the `XtbClient.Messages.TradeStatus` struct is sent to the `subscriber` process. + """ + @spec subscribe_get_trade_status(GenServer.server(), GenServer.server()) :: :ok + def subscribe_get_trade_status(pid, subscriber) do + GenServer.cast( + pid, + {:subscribe, {subscriber, StreamingMessage.new("getTradeStatus", "tradeStatus")}} + ) + end + + @impl true + def handle_call({method}, {_pid, ref} = from, %State{mpid: mpid, clients: clients} = state) do + ref_string = inspect(ref) + MainSocket.query(mpid, self(), ref_string, method) + + clients = Map.put(clients, ref_string, {from, method, nil}) + state = %State{state | clients: clients} + + {:noreply, state} + end + + @impl true + def handle_call( + {method, params}, + {_pid, ref} = from, + %State{mpid: mpid, clients: clients} = state + ) do + ref_string = inspect(ref) + MainSocket.query(mpid, self(), ref_string, method, params) + + clients = Map.put(clients, ref_string, {from, method, params}) + state = %State{state | clients: clients} + + {:noreply, state} + end + + @impl true + def handle_cast( + {:response, ref, %RateInfos{data: data} = resp} = _message, + %State{clients: clients} = state + ) do + {_client, _method, %{info: %{symbol: symbol}}} = Map.get(clients, ref) + + resp = %RateInfos{ + resp + | data: Enum.map(data, &%Candle{&1 | symbol: symbol}) + } + + state = handle_query_response(ref, resp, state) + + {:noreply, state} + end + + @impl true + def handle_cast({:response, ref, resp} = _message, %State{} = state) do + state = handle_query_response(ref, resp, state) + + {:noreply, state} + end + + @impl true + def handle_cast( + {:stream_session_id, session_id} = _message, + %State{type: type, url: url} = state + ) do + args = %{ + stream_session_id: session_id, + type: type, + url: url + } + + {:ok, spid} = StreamingSocket.start_link(args) + state = %{state | spid: spid} + + {:noreply, state} + end + + @impl true + def handle_cast( + {:subscribe, {subscriber, %StreamingMessage{} = streaming_message}} = _message, + %State{spid: spid, subscribers: subscribers} = state + ) do + StreamingSocket.subscribe(spid, self(), streaming_message) + + token = StreamingMessage.encode_token(streaming_message) + subscribers = Map.put(subscribers, token, subscriber) + state = %State{state | subscribers: subscribers} + + {:noreply, state} + end + + @impl true + def handle_cast( + {:stream_result, {token, result}} = _message, + %State{subscribers: subscribers} = state + ) do + subscriber = Map.get(subscribers, token) + send(subscriber, {:ok, result}) + + {:noreply, state} + end + + defp handle_query_response(ref, response, %State{clients: clients} = state) do + {{client, _method, _params}, clients} = Map.pop!(clients, ref) + GenServer.reply(client, response) + + %State{state | clients: clients} + end + + @impl true + def handle_info({:EXIT, pid, reason}, state) do + Logger.error( + "Module handled exit message from #{inspect(pid)} with reason #{inspect(reason)}." + ) + + {:stop, :shutdown, state} + end +end diff --git a/lib/xtb_client/error.ex b/lib/xtb_client/error.ex deleted file mode 100644 index 4cfe8b6..0000000 --- a/lib/xtb_client/error.ex +++ /dev/null @@ -1,18 +0,0 @@ -defmodule XtbClient.Error do - @moduledoc """ - Struct to represent errors returned by the XTB API - """ - - @enforce_keys [:code, :message] - defstruct [:code, :message] - - def new!(%{ - "errorCode" => code, - "errorDescr" => message - }) do - %__MODULE__{ - code: code, - message: message - } - end -end diff --git a/lib/xtb_client/main_socket.ex b/lib/xtb_client/main_socket.ex index eda43d8..230d972 100644 --- a/lib/xtb_client/main_socket.ex +++ b/lib/xtb_client/main_socket.ex @@ -10,15 +10,13 @@ defmodule XtbClient.MainSocket do """ use WebSockex - alias XtbClient.AccountType - alias XtbClient.Error + alias XtbClient.{AccountType} alias XtbClient.Messages alias XtbClient.RateLimit require Logger @ping_interval 30 * 1000 - @default_query_timeout 10_000 defmodule Config do @type t :: %{ @@ -83,10 +81,10 @@ defmodule XtbClient.MainSocket do @doc """ Starts a `XtbClient.MainSocket` process linked to the calling process. """ - @spec start_link(Config.t(), keyword()) :: GenServer.on_start() - def start_link(args, opts \\ []) do + @spec start_link(Config.t()) :: GenServer.on_start() + def start_link(opts) do %{type: type, url: url, user: user, password: password, app_name: app_name} = - Config.parse(args) + Config.parse(opts) state = %State{ url: url, @@ -98,14 +96,11 @@ defmodule XtbClient.MainSocket do rate_limit: RateLimit.new(200) } - WebSockex.start_link(url, __MODULE__, state, opts) + WebSockex.start_link(url, __MODULE__, state) end @impl WebSockex - def handle_connect( - _conn, - %State{user: user, password: password, app_name: app_name} = state - ) do + def handle_connect(_conn, %State{user: user, password: password, app_name: app_name} = state) do login_args = %{ "userId" => user, "password" => password, @@ -132,353 +127,75 @@ defmodule XtbClient.MainSocket do end @doc """ - Calls query to get streaming session ID. + Casts query to get streaming session ID. ## Arguments - `server` pid of the main socket process, + - `caller` pid of the caller awaiting for the result. - Call to this methods blocks until valid streaming session ID is available - or timeout. - """ - @spec stream_session_id(GenServer.server()) :: - {:ok, String.t() | nil} | {:error, :timeout} | {:error, Error.t()} - def stream_session_id(server) do - ref_string = inspect(make_ref()) - - WebSockex.cast( - server, - {:stream_session_id, {self(), ref_string}} - ) - - receive do - {:"$gen_cast", {:stream_session_id_reply, ^ref_string, response}} -> - {:ok, response} - after - @default_query_timeout -> - {:error, :timeout} - end - end - - @doc """ - Returns array of all symbols available for the user. - """ - @spec get_all_symbols(GenServer.server()) :: - {:ok, Messages.SymbolInfos.t()} | {:error, :timeout} | {:error, Error.t()} - def get_all_symbols(server) do - handle_query(server, "getAllSymbols") - end - - @doc """ - Returns calendar with market events. - """ - @spec get_calendar(GenServer.server()) :: - {:ok, Messages.CalendarInfos.t()} | {:error, :timeout} | {:error, Error.t()} - def get_calendar(server) do - handle_query(server, "getCalendar") - end - - @doc """ - Returns chart info from start date to the current time. - - If the chosen period of `XtbClient.Messages.ChartLast.Query` is greater than 1 minute, the last candle returned by the API can change until the end of the period (the candle is being automatically updated every minute). - - Limitations: there are limitations in charts data availability. Detailed ranges for charts data, what can be accessed with specific period, are as follows: - - - PERIOD_M1 --- <0-1) month, i.e. one month time - - PERIOD_M30 --- <1-7) month, six months time - - PERIOD_H4 --- <7-13) month, six months time - - PERIOD_D1 --- 13 month, and earlier on - - Note, that specific PERIOD_ is the lowest (i.e. the most detailed) period, accessible in listed range. For instance, in months range <1-7) you can access periods: PERIOD_M30, PERIOD_H1, PERIOD_H4, PERIOD_D1, PERIOD_W1, PERIOD_MN1. - Specific data ranges availability is guaranteed, however those ranges may be wider, e.g.: PERIOD_M1 may be accessible for 1.5 months back from now, where 1.0 months is guaranteed. - - ## Example scenario: - - * request charts of 5 minutes period, for 3 months time span, back from now; - * response: you are guaranteed to get 1 month of 5 minutes charts; because, 5 minutes period charts are not accessible 2 months and 3 months back from now - - **Please note that this function can be usually replaced by its streaming equivalent `subscribe_get_candles/3` which is the preferred way of retrieving current candle data.** - """ - @spec get_chart_last( - GenServer.server(), - Messages.ChartLast.Query.t() - ) :: {:ok, Messages.RateInfos.t()} | {:error, :timeout} | {:error, Error.t()} - def get_chart_last(server, %Messages.ChartLast.Query{} = params) do - %Messages.ChartLast.Query{symbol: symbol} = params - - case handle_query(server, "getChartLastRequest", %{info: params}) do - {:ok, %Messages.RateInfos{data: data} = response} -> - response = %Messages.RateInfos{ - response - | data: Enum.map(data, &%Messages.Candle{&1 | symbol: symbol}) - } - - {:ok, response} - - error -> - error - end - end - - @doc """ - Returns chart info with data between given start and end dates. - - Limitations: there are limitations in charts data availability. Detailed ranges for charts data, what can be accessed with specific period, are as follows: - - - PERIOD_M1 --- <0-1) month, i.e. one month time - - PERIOD_M30 --- <1-7) month, six months time - - PERIOD_H4 --- <7-13) month, six months time - - PERIOD_D1 --- 13 month, and earlier on - - Note, that specific PERIOD_ is the lowest (i.e. the most detailed) period, accessible in listed range. For instance, in months range <1-7) you can access periods: PERIOD_M30, PERIOD_H1, PERIOD_H4, PERIOD_D1, PERIOD_W1, PERIOD_MN1. - Specific data ranges availability is guaranteed, however those ranges may be wider, e.g.: PERIOD_M1 may be accessible for 1.5 months back from now, where 1.0 months is guaranteed. - - **Please note that this function can be usually replaced by its streaming equivalent `subscribe_get_candles/3` which is the preferred way of retrieving current candle data.** - """ - @spec get_chart_range(GenServer.server(), Messages.ChartRange.Query.t()) :: - {:ok, Messages.RateInfos.t()} | {:error, :timeout} | {:error, Error.t()} - def get_chart_range(server, %Messages.ChartRange.Query{} = params) do - %Messages.ChartRange.Query{symbol: symbol} = params - - case handle_query(server, "getChartRangeRequest", %{info: params}) do - {:ok, %Messages.RateInfos{data: data} = response} -> - response = %Messages.RateInfos{ - response - | data: Enum.map(data, &%Messages.Candle{&1 | symbol: symbol}) - } - - {:ok, response} - - error -> - error - end - end - - @doc """ - Returns calculation of commission and rate of exchange. - - The value is calculated as expected value and therefore might not be perfectly accurate. - """ - @spec get_commission_def(GenServer.server(), Messages.SymbolVolume.t()) :: - {:ok, Messages.CommissionDefinition.t()} | {:error, :timeout} | {:error, Error.t()} - def get_commission_def(server, %Messages.SymbolVolume{} = params) do - handle_query(server, "getCommissionDef", params) - end - - @doc """ - Returns information about account currency and account leverage. - """ - @spec get_current_user_data(GenServer.server()) :: - {:ok, Messages.UserInfo.t()} | {:error, :timeout} | {:error, Error.t()} - def get_current_user_data(server) do - handle_query(server, "getCurrentUserData") - end - - @doc """ - Returns IBs data from the given time range. - """ - @spec get_ibs_history(GenServer.server(), Messages.DateRange.t()) :: - {:ok, map()} | {:error, :timeout} | {:error, Error.t()} - def get_ibs_history(server, %Messages.DateRange{} = params) do - handle_query(server, "getIbsHistory", params) - end - - @doc """ - Returns various account indicators. - - **Please note that this function can be usually replaced by its streaming equivalent `subscribe_get_balance/2` which is the preferred way of retrieving current account indicators.** - """ - @spec get_margin_level(GenServer.server()) :: - {:ok, Messages.BalanceInfo.t()} | {:error, :timeout} | {:error, Error.t()} - def get_margin_level(server) do - handle_query(server, "getMarginLevel") - end - - @doc """ - Returns expected margin for given instrument and volume. - - The value is calculated as expected margin value and therefore might not be perfectly accurate. - """ - @spec get_margin_trade(GenServer.server(), Messages.SymbolVolume.t()) :: - {:ok, Messages.MarginTrade.t()} | {:error, :timeout} | {:error, Error.t()} - def get_margin_trade(server, %Messages.SymbolVolume{} = params) do - handle_query(server, "getMarginTrade", params) - end - - @doc """ - Returns news from trading server which were sent within specified period of time. - - **Please note that this function can be usually replaced by its streaming equivalent `subscribe_get_news/2` which is the preferred way of retrieving news data.** - """ - @spec get_news(GenServer.server(), Messages.DateRange.t()) :: - {:ok, Messages.NewsInfos.t()} | {:error, :timeout} | {:error, Error.t()} - def get_news(server, %Messages.DateRange{} = params) do - handle_query(server, "getNews", params) - end - - @doc """ - Calculates estimated profit for given deal data. - - Should be used for calculator-like apps only. - Profit for opened transactions should be taken from server, due to higher precision of server calculation. - """ - @spec get_profit_calculation(GenServer.server(), Messages.ProfitCalculation.Query.t()) :: - {:ok, Messages.ProfitCalculation.t()} | {:error, :timeout} | {:error, Error.t()} - def get_profit_calculation(server, %Messages.ProfitCalculation.Query{} = params) do - handle_query(server, "getProfitCalculation", params) - end - - @doc """ - Returns current time on trading server. - """ - @spec get_server_time(GenServer.server()) :: - {:ok, Messages.ServerTime.t()} | {:error, :timeout} | {:error, Error.t()} - def get_server_time(server) do - handle_query(server, "getServerTime") - end - - @doc """ - Returns a list of step rules for DMAs. - """ - @spec get_step_rules(GenServer.server()) :: - {:ok, Messages.StepRules.t()} | {:error, :timeout} | {:error, Error.t()} - def get_step_rules(server) do - handle_query(server, "getStepRules") - end - - @doc """ - Returns information about symbol available for the user. - """ - @spec get_symbol(GenServer.server(), Messages.SymbolInfo.Query.t()) :: - {:ok, Messages.SymbolInfo.t()} | {:error, :timeout} | {:error, Error.t()} - def get_symbol(server, %Messages.SymbolInfo.Query{} = params) do - handle_query(server, "getSymbol", params) - end - - @doc """ - Returns array of current quotations for given symbols, only quotations that changed from given timestamp are returned. - - New timestamp obtained from output will be used as an argument of the next call of this command. - - **Please note that this function can be usually replaced by its streaming equivalent `subscribe_get_tick_prices/3` which is the preferred way of retrieving ticks data.** + Result of the query will be delivered to message mailbox of the `caller` process. """ - @spec get_tick_prices(GenServer.server(), Messages.TickPrices.Query.t()) :: - {:ok, Messages.TickPrices.t()} | {:error, :timeout} | {:error, Error.t()} - def get_tick_prices(server, %Messages.TickPrices.Query{} = params) do - handle_query(server, "getTickPrices", params) + @spec stream_session_id(GenServer.server(), GenServer.server()) :: :ok + def stream_session_id(server, caller) do + WebSockex.cast(server, {:stream_session_id, caller}) end @doc """ - Returns array of trades listed in orders query. - """ - @spec get_trade_records(GenServer.server(), Messages.TradeInfos.Query.t()) :: - {:ok, Messages.TradeInfos.t()} | {:error, :timeout} | {:error, Error.t()} - def get_trade_records(server, %Messages.TradeInfos.Query{} = params) do - handle_query(server, "getTradeRecords", params) - end + Casts query to get data from the backend server. - @doc """ - Returns array of user's trades. - - **Please note that this function can be usually replaced by its streaming equivalent `subscribe_get_trades/2` which is the preferred way of retrieving trades data.** - """ - @spec get_trades(GenServer.server(), Messages.Trades.Query.t()) :: - {:ok, Messages.TradeInfos.t()} | {:error, :timeout} | {:error, Error.t()} - def get_trades(server, %Messages.Trades.Query{} = params) do - handle_query(server, "getTrades", params) - end + Might be also used to send command to the backend server. - @doc """ - Returns array of user's trades which were closed within specified period of time. - """ - @spec get_trades_history(GenServer.server(), Messages.DateRange.t()) :: - {:ok, Messages.TradeInfos.t()} | {:error, :timeout} | {:error, Error.t()} - def get_trades_history(server, %Messages.DateRange{} = params) do - handle_query(server, "getTradesHistory", params) - end - - @doc """ - Returns quotes and trading times. - """ - @spec get_trading_hours(GenServer.server(), Messages.TradingHours.Query.t()) :: - {:ok, Messages.TradingHours.t()} | {:error, :timeout} | {:error, Error.t()} - def get_trading_hours(server, %Messages.TradingHours.Query{} = params) do - handle_query(server, "getTradingHours", params) - end - - @doc """ - Returns the current API version. - """ - @spec get_version(GenServer.server()) :: - {:ok, Messages.Version.t()} | {:error, :timeout} | {:error, Error.t()} - def get_version(server) do - handle_query(server, "getVersion") - end - - @doc """ - Starts trade transaction. - - `trade_transaction/2` sends main transaction information to the server. + ## Arguments + - `server` pid of the main socket process, + - `caller` pid of the caller awaiting for the result, + - `ref` unique reference of the query, + - `method` name of the query method, + - `params` [optional] arguments of the `method`. - ## How to verify that the trade request was accepted? - The status field set to 'true' does not imply that the transaction was accepted. It only means, that the server acquired your request and began to process it. - To analyse the status of the transaction (for example to verify if it was accepted or rejected) use the `trade_transaction_status/2` command with the order number that came back with the response of the `trade_transaction/2` command. + Result of the query will be delivered to message mailbox of the `caller` process. """ - @spec trade_transaction(GenServer.server(), Messages.TradeTransaction.Command.t()) :: - {:ok, Messages.TradeTransaction.t()} | {:error, :timeout} | {:error, Error.t()} - def trade_transaction(server, %Messages.TradeTransaction.Command{} = params) do - handle_query(server, "tradeTransaction", %{tradeTransInfo: params}) + @spec query(GenServer.server(), GenServer.server(), term(), String.t()) :: :ok + def query(server, caller, ref, method) do + WebSockex.cast(server, {:query, {caller, ref, method}}) end - @doc """ - Returns current transaction status. - - At any time of transaction processing client might check the status of transaction on server side. - In order to do that client must provide unique order taken from `trade_transaction/2` invocation. - """ - @spec trade_transaction_status( - GenServer.server(), - Messages.TradeTransactionStatus.Query.t() - ) :: - {:ok, Messages.TradeTransactionStatus.t()} - | {:error, :timeout} - | {:error, Error.t()} - def trade_transaction_status(server, %Messages.TradeTransactionStatus.Query{} = params) do - handle_query(server, "tradeTransactionStatus", params) + @spec query(GenServer.server(), GenServer.server(), term(), String.t(), map()) :: :ok + def query(server, caller, ref, method, params) do + WebSockex.cast(server, {:query, {caller, ref, method, params}}) end - defp handle_query(server, method, params \\ nil) do - ref_string = inspect(make_ref()) - - WebSockex.cast( - server, - {:query, {self(), ref_string, {method, params}}} - ) - - receive do - {:"$gen_cast", {:response, ^ref_string, response}} -> - {:ok, response} + @impl WebSockex + def handle_cast( + {:stream_session_id, caller}, + %State{stream_session_id: result} = state + ) do + GenServer.cast(caller, {:stream_session_id, result}) - {:"$gen_cast", {:error, ^ref_string, response}} -> - {:error, response} - after - @default_query_timeout -> - {:error, :timeout} - end + {:ok, state} end @impl WebSockex def handle_cast( - {:stream_session_id, {caller, ref}}, - %State{stream_session_id: result} = state + {:query, {caller, ref, method}}, + %State{queries: queries, rate_limit: rate_limit} = state ) do - GenServer.cast(caller, {:stream_session_id_reply, ref, result}) + rate_limit = RateLimit.check_rate(rate_limit) - {:ok, state} + message = encode_command(method, ref) + queries = Map.put(queries, ref, {:query, caller, ref, method}) + + state = %{ + state + | queries: queries, + rate_limit: rate_limit + } + + {:reply, {:text, message}, state} end @impl WebSockex def handle_cast( - {:query, {caller, ref, {method, params}}}, + {:query, {caller, ref, method, params}}, %State{queries: queries, rate_limit: rate_limit} = state ) do rate_limit = RateLimit.check_rate(rate_limit) @@ -486,7 +203,7 @@ defmodule XtbClient.MainSocket do message = encode_command(method, params, ref) queries = Map.put(queries, ref, {:query, caller, ref, method}) - state = %State{ + state = %{ state | queries: queries, rate_limit: rate_limit @@ -500,30 +217,38 @@ defmodule XtbClient.MainSocket do {:reply, frame, state} end - defp encode_command(method, params \\ nil, ref \\ nil) when is_binary(method) do - %{ - command: method, - arguments: params, - customTag: ref - } - |> Map.filter(fn {_, value} -> value != nil end) - |> Jason.encode!() + defp encode_command(type) do + Jason.encode!(%{ + command: type + }) + end + + defp encode_command(type, tag) when is_binary(tag) do + Jason.encode!(%{ + command: type, + customTag: tag + }) + end + + defp encode_command(type, args) when is_map(args) do + Jason.encode!(%{ + command: type, + arguments: args + }) + end + + defp encode_command(type, args, tag) when is_map(args) and is_binary(tag) do + Jason.encode!(%{ + command: type, + arguments: args, + customTag: tag + }) end @impl WebSockex def handle_frame({:text, msg}, state) do - with {:ok, resp} <- Jason.decode(msg), - {response, caller, state} <- handle_response(resp, state), - :ok <- GenServer.cast(caller, response) do - {:ok, state} - else - {:ok, _} = result -> - result - - other -> - Logger.warning("Socket received unknown message: #{inspect(other)}") - {:ok, state} - end + resp = Jason.decode!(msg) + handle_response(resp, state) end defp handle_response( @@ -533,13 +258,14 @@ defmodule XtbClient.MainSocket do {{:query, caller, ^ref, method}, queries} = Map.pop(queries, ref) result = Messages.decode_message(method, data) + GenServer.cast(caller, {:response, ref, result}) - state = %State{state | queries: queries} - {{:response, ref, result}, caller, state} + state = %{state | queries: queries} + {:ok, state} end defp handle_response(%{"status" => true, "streamSessionId" => stream_session_id}, state) do - state = %State{state | stream_session_id: stream_session_id} + state = %{state | stream_session_id: stream_session_id} {:ok, state} end @@ -548,22 +274,17 @@ defmodule XtbClient.MainSocket do end defp handle_response( - %{"status" => false, "customTag" => ref} = response, - %State{queries: queries} = state + %{"status" => false, "errorCode" => code, "errorDescr" => message}, + state ) do - {{:query, caller, ^ref, _method}, queries} = Map.pop(queries, ref) + Logger.error("Exception: #{inspect(%{code: code, message: message})}") - error = Error.new!(response) - Logger.error("Socket received error: #{inspect(error)}") - - state = %State{state | queries: queries} - {{:error, ref, error}, caller, state} + {:close, state} end @impl WebSockex def handle_info({:ping, {:text, _command} = frame, interval} = message, state) do schedule_work(message, interval) - {:reply, frame, state} end end diff --git a/lib/xtb_client/messages/trade_info.ex b/lib/xtb_client/messages/trade_info.ex index 825d938..e2c9135 100644 --- a/lib/xtb_client/messages/trade_info.ex +++ b/lib/xtb_client/messages/trade_info.ex @@ -34,9 +34,6 @@ defmodule XtbClient.Messages.TradeInfo do """ alias XtbClient.Messages.Operation - alias XtbClient.Messages.TradeType - - @type state :: :modified | :deleted @type t :: %__MODULE__{ close_price: float(), @@ -59,13 +56,13 @@ defmodule XtbClient.Messages.TradeInfo do profit: float(), stop_loss: float(), spread: float() | nil, - state: state(), + state: integer() | nil, storage: float(), symbol: String.t() | nil, taxes: float() | nil, timestamp: DateTime.t() | nil, take_profit: float(), - type: TradeType.t(), + type: integer() | nil, volume: float() } @@ -132,9 +129,6 @@ defmodule XtbClient.Messages.TradeInfo do ) do value = args |> Map.drop(["state", "type"]) |> new() - state = state(state) - type = TradeType.parse(type) - %{value | state: state, type: type} end @@ -226,7 +220,4 @@ defmodule XtbClient.Messages.TradeInfo do volume: volume } end - - def state("Modified"), do: :modified - def state("Deleted"), do: :deleted end diff --git a/lib/xtb_client/messages/trade_type.ex b/lib/xtb_client/messages/trade_type.ex index 92b9330..a229337 100644 --- a/lib/xtb_client/messages/trade_type.ex +++ b/lib/xtb_client/messages/trade_type.ex @@ -13,24 +13,6 @@ defmodule XtbClient.Messages.TradeType do @type t :: :open | :pending | :close | :modify | :delete @type trade_code :: 0..4 - @doc """ - Parse integer value as valid atom for trade type. - """ - @spec parse(value :: trade_code()) :: t() - def parse(value) when value in [0, 1, 2, 3, 4] do - parse_type(value) - end - - defp parse_type(value) do - case value do - 0 -> :open - 1 -> :pending - 2 -> :close - 3 -> :modify - 4 -> :delete - end - end - @doc """ Format atom representing trade type to integer value. """ diff --git a/lib/xtb_client/streaming_socket.ex b/lib/xtb_client/streaming_socket.ex index 69d930c..40b945b 100644 --- a/lib/xtb_client/streaming_socket.ex +++ b/lib/xtb_client/streaming_socket.ex @@ -2,18 +2,14 @@ defmodule XtbClient.StreamingSocket do @moduledoc """ WebSocket server used for asynchronous communication. - `StreamingSocket` is being used like standard `GenServer` - could be started with `start_link/2` and supervised. + `StreamingSocket` is being used like standard `GenServer` - could be started with `start_link/1` and supervised. After successful connection to WebSocket the flow is: - - process schedules to itself the `ping` command (with recurring interval) - to maintain persistent connection with backend - - process waits for subscription requests from other processes - - when subscription request is received, process sends subscription command to WebSocket - - when response from WebSocket is received, process sends result to caller process via `handle_message/2` callback + - process schedules to itself the `ping` command (with recurring interval) - to maintain persistent connection with backend. """ use WebSockex alias XtbClient.{AccountType, StreamingMessage} - alias XtbClient.Error alias XtbClient.Messages alias XtbClient.RateLimit @@ -22,13 +18,10 @@ defmodule XtbClient.StreamingSocket do @ping_interval 30 * 1000 defmodule Config do - @moduledoc false - @type t :: %{ :url => String.t() | URI.t(), :type => AccountType.t(), - :stream_session_id => String.t(), - :module => module() + :stream_session_id => String.t() } def parse(opts) do @@ -37,90 +30,44 @@ defmodule XtbClient.StreamingSocket do %{ url: get_in(opts, [:url]) |> URI.merge(type) |> URI.to_string(), type: type, - stream_session_id: get_in(opts, [:stream_session_id]), - module: get_in(opts, [:module]) + stream_session_id: get_in(opts, [:stream_session_id]) } end end defmodule State do - @moduledoc false - @enforce_keys [ :url, :stream_session_id, - :module, :subscriptions, :rate_limit ] defstruct url: nil, stream_session_id: nil, - module: nil, subscriptions: %{}, rate_limit: nil end - @doc """ - Callback invoked when message from WebSocket is received. - - ## Params: - - `token` - unique token of the subscribed method & params, - - `message` - struct with response data - """ - @callback handle_message( - token :: StreamingMessage.token(), - message :: struct() - ) :: :ok - - @doc """ - Callback invoked when error is received from WebSocket. - """ - @callback handle_error(error :: Error.t()) :: :ok - - @doc false - defmacro __using__(_opts) do - quote location: :keep do - @behaviour XtbClient.StreamingSocket - - @doc false - def handle_message(token, message) do - raise "No handle_message/2 clause in #{__MODULE__} provided for #{inspect(message)}" - end - - @doc false - def handle_error(error) do - raise "No handle_error/1 clause in #{__MODULE__} provided for #{inspect(error)}" - end - - defoverridable handle_message: 2, handle_error: 1 - end - end - @doc """ Starts a `XtbClient.StreamingSocket` process linked to the calling process. """ @spec start_link(Config.t()) :: GenServer.on_start() - def start_link(args, _opts \\ []) do - %{url: url, stream_session_id: stream_session_id, module: module} = - Config.parse(args) - - state = - %State{ - url: url, - stream_session_id: stream_session_id, - module: module, - subscriptions: %{}, - rate_limit: RateLimit.new(200) - } + def start_link(opts) do + %{url: url, stream_session_id: stream_session_id} = + Config.parse(opts) + + state = %State{ + url: url, + stream_session_id: stream_session_id, + subscriptions: %{}, + rate_limit: RateLimit.new(200) + } WebSockex.start_link(url, __MODULE__, state) end @impl WebSockex - def handle_connect( - _conn, - %State{stream_session_id: stream_session_id} = state - ) do + def handle_connect(_conn, %State{stream_session_id: stream_session_id} = state) do ping_command = encode_streaming_command({"ping", nil}, stream_session_id) ping_message = {:ping, {:text, ping_command}, @ping_interval} schedule_work(ping_message, 1) @@ -133,183 +80,52 @@ defmodule XtbClient.StreamingSocket do end @doc """ - Allows to get actual account indicators values in real-time, as soon as they are available in the system. + Subscribes `pid` process for messages from `method` query. - Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. - When the new data are available, the `XtbClient.Messages.BalanceInfo` struct is sent via `handle_message/2` callback. - """ - @spec subscribe_get_balance(socket :: GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_balance(socket) do - with message <- StreamingMessage.new("getBalance", "balance"), - token <- StreamingMessage.encode_token(message), - :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} - else - err -> {:error, err} - end - end - - @doc """ - Subscribes for API chart candles. - The interval of every candle is 1 minute. A new candle arrives every minute. - - Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. - When the new data are available, the `XtbClient.Messages.Candle` struct is sent via `handle_message/2` callback. - """ - @spec subscribe_get_candles( - GenServer.server(), - XtbClient.Messages.Candles.Query.t() - ) :: {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_candles(socket, %Messages.Candles.Query{} = params) do - with message <- StreamingMessage.new("getCandles", "candle", params), - token <- StreamingMessage.encode_token(message), - :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} - else - err -> {:error, err} - end - end - - @doc """ - Subscribes for 'keep alive' messages. - A new 'keep alive' message is sent by the API every 3 seconds. + ## Arguments + - `server` pid of the streaming socket process, + - `caller` pid of the caller awaiting for the result, + - `message` struct with call context, see `XtbClient.StreamingMessage`. - Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. - When the new data are available, the `XtbClient.Messages.Candle` struct is sent via `handle_message/2` callback. + Result of the query will be delivered to message mailbox of the `caller` process. """ - @spec subscribe_keep_alive(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_keep_alive(socket) do - with message <- StreamingMessage.new("getKeepAlive", "keepAlive"), - token <- StreamingMessage.encode_token(message), - :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} - else - err -> {:error, err} - end - end - - @doc """ - Subscribes for news. - - Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. - When the new data are available, the `XtbClient.Messages.Candle` struct is sent via `handle_message/2` callback. - """ - @spec subscribe_get_news(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_news(socket) do - with message <- StreamingMessage.new("getNews", "news"), - token <- StreamingMessage.encode_token(message), - :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} - else - err -> {:error, err} - end - end - - @doc """ - Subscribes for profits. - - Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. - When the new data are available, the `XtbClient.Messages.Candle` struct is sent via `handle_message/2` callback. - """ - @spec subscribe_get_profits(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_profits(socket) do - with message <- StreamingMessage.new("getProfits", "profit"), - token <- StreamingMessage.encode_token(message), - :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} - else - err -> {:error, err} - end - end - - @doc """ - Establishes subscription for quotations and allows to obtain the relevant information in real-time, as soon as it is available in the system. - The `subscribe_get_tick_prices/3` command can be invoked many times for the same symbol, but only one subscription for a given symbol will be created. - Please beware that when multiple records are available, the order in which they are received is not guaranteed. - - Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. - When the new data are available, the `XtbClient.Messages.Candle` struct is sent via `handle_message/2` callback. - """ - @spec subscribe_get_tick_prices( - GenServer.server(), - XtbClient.Messages.Quotations.Query.t() - ) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_tick_prices(socket, %Messages.Quotations.Query{} = params) do - with message <- StreamingMessage.new("getTickPrices", "tickPrices", params), - token <- StreamingMessage.encode_token(message), - :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} - else - err -> {:error, err} - end - end - - @doc """ - Establishes subscription for user trade status data and allows to obtain the relevant information in real-time, as soon as it is available in the system. - Please beware that when multiple records are available, the order in which they are received is not guaranteed. - - Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. - When the new data are available, the `XtbClient.Messages.Candle` struct is sent via `handle_message/2` callback. - """ - @spec subscribe_get_trades(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_trades(socket) do - with message <- StreamingMessage.new("getTrades", "trade"), - token <- StreamingMessage.encode_token(message), - :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} - else - err -> {:error, err} - end - end - - @doc """ - Allows to get status for sent trade requests in real-time, as soon as it is available in the system. - Please beware that when multiple records are available, the order in which they are received is not guaranteed. - - Operation is asynchronous, so the immediate response is an `{:ok, token}` tuple, where token is a unique hash of subscribed operation. - When the new data are available, the `XtbClient.Messages.Candle` struct is sent via `handle_message/2` callback. - """ - @spec subscribe_get_trade_status(GenServer.server()) :: - {:ok, StreamingMessage.token()} | {:error, term()} - def subscribe_get_trade_status(socket) do - with message <- StreamingMessage.new("getTradeStatus", "tradeStatus"), - token <- StreamingMessage.encode_token(message), - :ok <- WebSockex.cast(socket, {:subscribe, message}) do - {:ok, token} - else - err -> {:error, err} - end + @spec subscribe(GenServer.server(), GenServer.server(), StreamingMessage.t()) :: :ok + def subscribe( + server, + caller, + %StreamingMessage{} = message + ) do + WebSockex.cast(server, {:subscribe, {caller, message}}) end @impl WebSockex def handle_cast( - { - :subscribe, + {:subscribe, + {caller, %StreamingMessage{ method: method, response_method: response_method, params: params - } = message - }, + } = message}}, %State{ subscriptions: subscriptions, rate_limit: rate_limit, stream_session_id: session_id - } = state + } = + state ) do rate_limit = RateLimit.check_rate(rate_limit) + token = StreamingMessage.encode_token(message) + subscriptions = - Map.put( + Map.update( subscriptions, response_method, - StreamingMessage.encode_token(message) + {method, %{token => caller}}, + fn {method, value} -> + {method, Map.put(value, token, caller)} + end ) encoded_message = encode_streaming_command({method, params}, session_id) @@ -318,40 +134,45 @@ defmodule XtbClient.StreamingSocket do {:reply, {:text, encoded_message}, state} end - defp encode_streaming_command({method, params}, streaming_session_id) - when is_binary(method) and is_binary(streaming_session_id) do - params = if params == nil, do: %{}, else: Map.from_struct(params) + defp encode_streaming_command({method, nil}, streaming_session_id) do + Jason.encode!(%{ + command: method, + streamSessionId: streaming_session_id + }) + end + defp encode_streaming_command({method, params}, streaming_session_id) do %{ command: method, streamSessionId: streaming_session_id } - |> Map.merge(params) + |> Map.merge(Map.from_struct(params)) |> Jason.encode!() end @impl WebSockex - def handle_frame({:text, msg}, %State{module: module} = state) do - with {:ok, resp} <- Jason.decode(msg), - {:ok, {token, message}} <- handle_response(resp, state), - :ok <- module.handle_message(token, message) do - {:ok, state} - else - {:error, error} -> - module.handle_error(error) - {:ok, state} - end + def handle_frame({:text, msg}, state) do + resp = Jason.decode!(msg) + handle_response(resp, state) end defp handle_response( %{"command" => response_method, "data" => data}, - %State{subscriptions: subscriptions} = _state + %State{subscriptions: subscriptions} = state ) do - with token <- Map.get(subscriptions, response_method), - method <- StreamingMessage.decode_method_name(token), - result <- Messages.decode_message(method, data) do - {:ok, {token, result}} - end + {method, method_subs} = Map.get(subscriptions, response_method) + result = Messages.decode_message(method, data) + + token = + method + |> StreamingMessage.new(response_method, result) + |> StreamingMessage.encode_token() + + caller = Map.get(method_subs, token) + + GenServer.cast(caller, {:stream_result, {token, result}}) + + {:ok, state} end defp handle_response(%{"status" => true}, state) do @@ -359,19 +180,17 @@ defmodule XtbClient.StreamingSocket do end defp handle_response( - %{"status" => false} = response, - _state + %{"status" => false, "errorCode" => code, "errorDescr" => message}, + state ) do - error = Error.new!(response) - Logger.error("Socket received error: #{inspect(error)}") + Logger.error("Exception: #{inspect(%{code: code, message: message})}") - {:error, error} + {:close, state} end @impl WebSockex def handle_info({:ping, {:text, _command} = frame, interval} = message, state) do schedule_work(message, interval) - {:reply, frame, state} end end diff --git a/mix.exs b/mix.exs index 9baa5b7..211b4bb 100644 --- a/mix.exs +++ b/mix.exs @@ -27,6 +27,14 @@ defmodule XtbClient.MixProject do ] end + # Run "mix help compile.app" to learn about applications. + def application do + [ + extra_applications: [:logger], + mod: {XtbClient.Application, []} + ] + end + # Specifies which paths to compile per environment. defp elixirc_paths(:test), do: ["lib", "test/support"] defp elixirc_paths(_), do: ["lib"] diff --git a/test/support/fixtures/main_socket_e2e_fixtures.ex b/test/support/fixtures/main_socket_e2e_fixtures.ex deleted file mode 100644 index af104d0..0000000 --- a/test/support/fixtures/main_socket_e2e_fixtures.ex +++ /dev/null @@ -1,42 +0,0 @@ -defmodule XtbClient.MainSocket.E2EFixtures do - @moduledoc false - - alias XtbClient.MainSocket - alias XtbClient.Messages - - def poll_stream_session_id(server) do - case MainSocket.stream_session_id(server) do - {:ok, nil} -> - Process.sleep(100) - poll_stream_session_id(server) - - {:ok, _session_id} = result -> - result - end - end - - def open_trade(pid, buy_args) do - buy = Messages.TradeTransaction.Command.new(buy_args) - MainSocket.trade_transaction(pid, buy) - end - - def close_trade(pid, open_order_id, close_args) do - # 1. way - get all opened only trades - trades_query = Messages.Trades.Query.new(true) - {:ok, result} = MainSocket.get_trades(pid, trades_query) - - position_to_close = Enum.find(result.data, &(&1.order_closed == open_order_id)) - - close_args = - Map.merge( - close_args, - %{ - price: position_to_close.open_price - 0.01, - order: position_to_close.order_opened - } - ) - - close = Messages.TradeTransaction.Command.new(close_args) - MainSocket.trade_transaction(pid, close) - end -end diff --git a/test/support/fixtures/transaction_helper.ex b/test/support/fixtures/transaction_helper.ex new file mode 100644 index 0000000..b68b240 --- /dev/null +++ b/test/support/fixtures/transaction_helper.ex @@ -0,0 +1,36 @@ +defmodule XtbClient.TransactionHelper do + @moduledoc false + alias XtbClient.Connection + + alias XtbClient.Messages.{ + TradeTransaction, + Trades + } + + def open_trade(pid, buy_args) do + buy = TradeTransaction.Command.new(buy_args) + result = Connection.trade_transaction(pid, buy) + + result.order + end + + def close_trade(pid, open_order_id, close_args) do + # 1. way - get all opened only trades + trades_query = Trades.Query.new(true) + result = Connection.get_trades(pid, trades_query) + + position_to_close = + result.data + |> Enum.find(&(&1.order_closed == open_order_id)) + + close_args = + close_args + |> Map.merge(%{ + price: position_to_close.open_price - 0.01, + order: position_to_close.order_opened + }) + + close = TradeTransaction.Command.new(close_args) + Connection.trade_transaction(pid, close) + end +end diff --git a/test/support/mocks/streaming_socket_mock.ex b/test/support/mocks/streaming_socket_mock.ex deleted file mode 100644 index 9cfc07e..0000000 --- a/test/support/mocks/streaming_socket_mock.ex +++ /dev/null @@ -1,47 +0,0 @@ -defmodule XtbClient.StreamingSocketMock do - @moduledoc false - - use XtbClient.StreamingSocket - - @store_mock XtbClient.StreamingTestStoreMock - - @impl XtbClient.StreamingSocket - def handle_message(_token, message) do - alive? = Process.whereis(@store_mock) - - case alive? do - nil -> - :ok - - _pid -> - parent_pid = - Agent.get( - @store_mock, - fn %{parent_pid: pid} -> pid end - ) - - send(parent_pid, {:ok, message}) - :ok - end - end - - @impl XtbClient.StreamingSocket - def handle_error(error) do - alive? = Process.whereis(@store_mock) - - case alive? do - nil -> - :ok - - _pid -> - parent_pid = - Agent.get( - @store_mock, - fn %{parent_pid: pid} -> pid end - ) - - send(parent_pid, {:error, error}) - :ok - end - end -end diff --git a/test/support/mocks/streaming_test_store_mock.ex b/test/support/mocks/streaming_test_store_mock.ex deleted file mode 100644 index 8d0470a..0000000 --- a/test/support/mocks/streaming_test_store_mock.ex +++ /dev/null @@ -1,8 +0,0 @@ -defmodule XtbClient.StreamingTestStoreMock do - @moduledoc false - use Agent - - def start_link(_opts) do - Agent.start_link(fn -> %{parent_pid: nil} end, name: __MODULE__) - end -end diff --git a/test/xtb_client/connection_test.exs b/test/xtb_client/connection_test.exs new file mode 100644 index 0000000..6eefd56 --- /dev/null +++ b/test/xtb_client/connection_test.exs @@ -0,0 +1,534 @@ +defmodule XtbClient.ConnectionTest do + @moduledoc false + use ExUnit.Case, async: true + doctest XtbClient.Connection + + alias XtbClient.Connection + + alias XtbClient.Messages.{ + BalanceInfo, + CalendarInfos, + CalendarInfo, + Candle, + Candles, + ChartLast, + ChartRange, + CommissionDefinition, + DateRange, + KeepAlive, + MarginTrade, + NewsInfos, + NewsInfo, + ProfitCalculation, + ProfitInfo, + Quote, + Quotations, + RateInfos, + ServerTime, + StepRules, + StepRule, + Step, + SymbolInfo, + SymbolInfos, + SymbolVolume, + TickPrices, + TickPrice, + TradeInfos, + TradeInfo, + Trades, + TradeStatus, + TradeTransaction, + TradeTransactionStatus, + TradingHours, + TradingHour, + UserInfo, + Version + } + + import XtbClient.TransactionHelper + + @default_wait_time 60 * 1000 + + setup do + Dotenvy.source([ + ".env.#{Mix.env()}", + ".env.#{Mix.env()}.override", + System.get_env() + ]) + + url = Dotenvy.env!("XTB_API_URL", :string!) + user = Dotenvy.env!("XTB_API_USERNAME", :string!) + passwd = Dotenvy.env!("XTB_API_PASSWORD", :string!) + + params = [ + connection: %{ + url: url, + user: user, + password: passwd, + type: :demo, + app_name: "XtbClient" + } + ] + + {:ok, pid} = start_supervised({Connection, params}) + + {:ok, %{params: params, pid: pid}} + end + + test "starts new connection with a name", %{params: params} do + params = params |> Keyword.put(:name, :test_connection) + {:ok, pid} = Connection.start_link(params) + + assert Process.whereis(:test_connection) == pid + end + + test "get all symbols", %{pid: pid} do + result = Connection.get_all_symbols(pid) + + assert %SymbolInfos{} = result + assert [elem | _] = result.data + assert %SymbolInfo{} = elem + end + + test "get calendar", %{pid: pid} do + result = Connection.get_calendar(pid) + + assert %CalendarInfos{} = result + assert [elem | _] = result.data + assert %CalendarInfo{} = elem + end + + test "get chart last", %{pid: pid} do + now = DateTime.utc_now() + + args = %{ + period: :h1, + start: DateTime.add(now, -30 * 24 * 60 * 60), + symbol: "EURPLN" + } + + query = ChartLast.Query.new(args) + result = Connection.get_chart_last(pid, query) + + assert %RateInfos{} = result + assert is_number(result.digits) + assert [elem | _] = result.data + + assert %Candle{ + symbol: symbol, + open: open, + high: high, + low: low, + close: close, + vol: vol, + ctm: ctm, + ctm_string: ctm_string, + quote_id: quote_id + } = elem + + assert "EURPLN" == symbol + assert is_number(open) + assert is_number(high) + assert is_number(low) + assert is_number(close) + assert is_number(vol) + assert DateTime.compare(ctm, now) == :lt + assert is_binary(ctm_string) + refute quote_id + end + + test "get chart range", %{pid: pid} do + now = DateTime.utc_now() + + args = %{ + range: + DateRange.new(%{ + from: DateTime.add(now, -2 * 30 * 24 * 60 * 60), + to: now + }), + period: :h1, + symbol: "EURPLN" + } + + query = ChartRange.Query.new(args) + result = Connection.get_chart_range(pid, query) + + assert %RateInfos{} = result + assert is_number(result.digits) + assert [elem | _] = result.data + + assert %Candle{ + symbol: symbol, + open: open, + high: high, + low: low, + close: close, + vol: vol, + ctm: ctm, + ctm_string: ctm_string, + quote_id: quote_id + } = elem + + assert "EURPLN" == symbol + assert is_number(open) + assert is_number(high) + assert is_number(low) + assert is_number(close) + assert is_number(vol) + assert DateTime.compare(ctm, now) == :lt + assert is_binary(ctm_string) + refute quote_id + end + + test "get commission definition", %{pid: pid} do + args = %{symbol: "EURPLN", volume: 1} + query = SymbolVolume.new(args) + result = Connection.get_commission_def(pid, query) + + assert %CommissionDefinition{} = result + end + + test "get current user data", %{pid: pid} do + result = Connection.get_current_user_data(pid) + + assert %UserInfo{} = result + end + + test "get margin level", %{pid: pid} do + result = Connection.get_margin_level(pid) + + assert %BalanceInfo{} = result + end + + test "get margin trade", %{pid: pid} do + args = %{symbol: "EURPLN", volume: 1} + query = SymbolVolume.new(args) + result = Connection.get_margin_trade(pid, query) + + assert %MarginTrade{} = result + end + + test "get news", %{pid: pid} do + args = %{ + from: DateTime.utc_now() |> DateTime.add(-2 * 30 * 24 * 60 * 60), + to: DateTime.utc_now() + } + + query = DateRange.new(args) + result = Connection.get_news(pid, query) + + assert %NewsInfos{} = result + assert [elem | _] = result.data + assert %NewsInfo{} = elem + end + + test "get profit calculation", %{pid: pid} do + args = %{ + open_price: 1.2233, + close_price: 1.3, + operation: :buy, + symbol: "EURPLN", + volume: 1.0 + } + + query = ProfitCalculation.Query.new(args) + result = Connection.get_profit_calculation(pid, query) + + assert %ProfitCalculation{} = result + end + + test "get server time", %{pid: pid} do + result = Connection.get_server_time(pid) + + assert %ServerTime{} = result + end + + test "get step rules", %{pid: pid} do + result = Connection.get_step_rules(pid) + + assert %StepRules{} = result + assert [elem | _] = result.data + assert %StepRule{steps: [step | _]} = elem + assert %Step{} = step + end + + test "get symbol", %{pid: pid} do + query = SymbolInfo.Query.new("BHW.PL_9") + result = Connection.get_symbol(pid, query) + + assert %SymbolInfo{} = result + end + + test "get tick prices", %{pid: pid} do + args = %{ + level: 0, + symbols: ["EURPLN"], + timestamp: DateTime.utc_now() |> DateTime.add(-2 * 60) + } + + query = TickPrices.Query.new(args) + result = Connection.get_tick_prices(pid, query) + + assert %TickPrices{} = result + assert [elem | _] = result.data + assert %TickPrice{} = elem + end + + test "get trades history", %{pid: pid} do + args = %{ + from: DateTime.utc_now() |> DateTime.add(-3 * 31 * 24 * 60 * 60), + to: DateTime.utc_now() + } + + query = DateRange.new(args) + result = Connection.get_trades_history(pid, query) + + assert %TradeInfos{} = result + assert [elem | _] = result.data + assert %TradeInfo{} = elem + end + + test "get trading hours", %{pid: pid} do + args = ["EURPLN", "AGO.PL_9"] + query = TradingHours.Query.new(args) + + result = Connection.get_trading_hours(pid, query) + + assert %TradingHours{} = result + assert [elem | _] = result.data + assert %TradingHour{} = elem + assert [qu | _] = elem.quotes + assert [trading | _] = elem.trading + assert %Quote{} = qu + assert %Quote{} = trading + end + + test "get version", %{pid: pid} do + result = Connection.get_version(pid) + + assert %Version{} = result + end + + test "trade transaction - open and close transaction", %{pid: pid} do + # needed to wait for message to be received from server that transaction is accepted + Connection.subscribe_get_trade_status(pid, self()) + + buy_args = %{ + operation: :buy, + custom_comment: "Buy transaction", + price: 1200.0, + symbol: "LITECOIN", + type: :open, + volume: 0.5 + } + + buy = TradeTransaction.Command.new(buy_args) + result = Connection.trade_transaction(pid, buy) + + assert %TradeTransaction{} = result + + assert_receive {:ok, %TradeStatus{}}, @default_wait_time + + open_order_id = result.order + status = TradeTransactionStatus.Query.new(open_order_id) + result = Connection.trade_transaction_status(pid, status) + + assert %TradeTransactionStatus{} = result + + # get all opened only trades + trades_query = Trades.Query.new(true) + result = Connection.get_trades(pid, trades_query) + + assert %TradeInfos{} = result + + position_to_close = + result.data + |> Enum.find(&(&1.order_closed == open_order_id)) + + close_args = %{ + operation: :buy, + custom_comment: "Close transaction", + price: position_to_close.open_price - 0.01, + symbol: "LITECOIN", + order: position_to_close.order_opened, + type: :close, + volume: 0.5 + } + + close = TradeTransaction.Command.new(close_args) + result = Connection.trade_transaction(pid, close) + + assert %TradeTransaction{} = result + assert_receive {:ok, %TradeStatus{}}, @default_wait_time + + close_order_id = result.order + status = TradeTransactionStatus.Query.new(close_order_id) + result = Connection.trade_transaction_status(pid, status) + + assert %TradeTransactionStatus{status: :accepted} = result + end + + test "subscribe to get balance", %{pid: pid} do + Connection.subscribe_get_balance(pid, self()) + + buy_args = %{ + operation: :buy, + custom_comment: "Buy transaction", + price: 1200.0, + symbol: "LITECOIN", + type: :open, + volume: 0.5 + } + + buy = TradeTransaction.Command.new(buy_args) + result = Connection.trade_transaction(pid, buy) + + assert %TradeTransaction{} = result + open_order_id = result.order + + assert_receive {:ok, %BalanceInfo{}}, @default_wait_time + + # get all opened only trades + trades_query = Trades.Query.new(true) + result = Connection.get_trades(pid, trades_query) + + assert %TradeInfos{} = result + + position_to_close = + result.data + |> Enum.find(&(&1.order_closed == open_order_id)) + + close_args = %{ + operation: :buy, + custom_comment: "Close transaction", + price: position_to_close.open_price - 0.01, + symbol: "LITECOIN", + order: position_to_close.order_opened, + type: :close, + volume: 0.5 + } + + close = TradeTransaction.Command.new(close_args) + result = Connection.trade_transaction(pid, close) + assert %TradeTransaction{} = result + + assert_receive {:ok, %BalanceInfo{}}, @default_wait_time + end + + @tag timeout: @default_wait_time + test "subscribe to get candles", %{pid: pid} do + args = "LITECOIN" + query = Candles.Query.new(args) + Connection.subscribe_get_candles(pid, self(), query) + + assert_receive {:ok, %Candle{}}, @default_wait_time + end + + test "subscribe to keep alive", %{pid: pid} do + Connection.subscribe_keep_alive(pid, self()) + + assert_receive {:ok, %KeepAlive{}}, @default_wait_time + end + + @tag skip: true + test "subscribe to get news", %{pid: pid} do + Connection.subscribe_get_news(pid, self()) + + assert_receive {:ok, %NewsInfo{}}, @default_wait_time + end + + test "subscribe to get profits", %{pid: pid} do + Connection.subscribe_get_profits(pid, self()) + + buy_args = %{ + operation: :buy, + custom_comment: "Buy transaction", + price: 1200.0, + symbol: "LITECOIN", + type: :open, + volume: 0.5 + } + + order_id = open_trade(pid, buy_args) + + assert_receive {:ok, %ProfitInfo{}}, @default_wait_time + + close_args = %{ + operation: :buy, + custom_comment: "Close transaction", + symbol: "LITECOIN", + type: :close, + volume: 0.5 + } + + close_trade(pid, order_id, close_args) + + assert_receive {:ok, %ProfitInfo{}}, @default_wait_time + end + + test "subscribe to get tick prices", %{pid: pid} do + args = %{symbol: "LITECOIN"} + query = Quotations.Query.new(args) + Connection.subscribe_get_tick_prices(pid, self(), query) + + assert_receive {:ok, %TickPrice{}}, @default_wait_time + end + + test "subscribe to get trades", %{pid: pid} do + Connection.subscribe_get_trades(pid, self()) + + buy_args = %{ + operation: :buy, + custom_comment: "Buy transaction", + price: 1200.0, + symbol: "LITECOIN", + type: :open, + volume: 0.5 + } + + order_id = open_trade(pid, buy_args) + + assert_receive {:ok, %TradeInfo{}}, @default_wait_time + + close_args = %{ + operation: :buy, + custom_comment: "Close transaction", + symbol: "LITECOIN", + type: :close, + volume: 0.5 + } + + close_trade(pid, order_id, close_args) + + assert_receive {:ok, %TradeInfo{}}, @default_wait_time + end + + test "subscribe to trade status", %{pid: pid} do + Connection.subscribe_get_trade_status(pid, self()) + + buy_args = %{ + operation: :buy, + custom_comment: "Buy transaction", + price: 1200.0, + symbol: "LITECOIN", + type: :open, + volume: 0.5 + } + + order_id = open_trade(pid, buy_args) + + assert_receive {:ok, %TradeStatus{}}, @default_wait_time + + close_args = %{ + operation: :buy, + custom_comment: "Close transaction", + symbol: "LITECOIN", + type: :close, + volume: 0.5 + } + + close_trade(pid, order_id, close_args) + + assert_receive {:ok, %TradeStatus{}}, @default_wait_time + end +end diff --git a/test/xtb_client/main_socket_test.exs b/test/xtb_client/main_socket_test.exs index 5ead5aa..d316bf4 100644 --- a/test/xtb_client/main_socket_test.exs +++ b/test/xtb_client/main_socket_test.exs @@ -1,50 +1,9 @@ defmodule XtbClient.MainSocketTest do @moduledoc false - use ExUnit.Case + use ExUnit.Case, async: true doctest XtbClient.MainSocket alias XtbClient.MainSocket - alias XtbClient.StreamingSocket - alias XtbClient.StreamingSocketMock - alias XtbClient.StreamingTestStoreMock - - import XtbClient.MainSocket.E2EFixtures - - alias XtbClient.Messages.{ - BalanceInfo, - CalendarInfos, - CalendarInfo, - Candle, - ChartLast, - ChartRange, - CommissionDefinition, - DateRange, - MarginTrade, - NewsInfos, - NewsInfo, - ProfitCalculation, - Quote, - RateInfos, - ServerTime, - StepRules, - StepRule, - Step, - SymbolInfo, - SymbolInfos, - SymbolVolume, - TickPrices, - TickPrice, - TradeInfos, - TradeInfo, - Trades, - TradeStatus, - TradeTransaction, - TradeTransactionStatus, - TradingHours, - TradingHour, - UserInfo, - Version - } setup do Dotenvy.source([ @@ -59,306 +18,30 @@ defmodule XtbClient.MainSocketTest do params = %{ url: url, - type: :demo, user: user, password: passwd, + type: :demo, app_name: "XtbClient" } - {:ok, pid} = start_supervised({MainSocket, params}) - - {:ok, %{params: params, pid: pid}} + {:ok, %{params: params}} end - describe "public API" do - test "stream_session_id is present", %{pid: pid} do - # needed to wait for socket to connect - # during that time stream_session_id should be available - Process.sleep(100) - - {:ok, stream_session_id} = MainSocket.stream_session_id(pid) - assert is_binary(stream_session_id) - end - - @tag timeout: 40 * 1000 - test "sends ping after login", %{pid: pid} do - Process.sleep(30 * 1000 + 1) - - assert Process.alive?(pid) == true - end - - test "get all symbols", %{pid: pid} do - assert {:ok, %SymbolInfos{data: data}} = MainSocket.get_all_symbols(pid) - assert [elem | _] = data - assert %SymbolInfo{} = elem - end - - test "get calendar", %{pid: pid} do - assert {:ok, %CalendarInfos{data: data}} = MainSocket.get_calendar(pid) - assert [elem | _] = data - assert %CalendarInfo{} = elem - end - - test "get chart last", %{pid: pid} do - now = DateTime.utc_now() - - args = %{ - period: :h1, - start: DateTime.add(now, -30 * 24 * 60 * 60), - symbol: "EURPLN" - } - - query = ChartLast.Query.new(args) - - assert {:ok, %RateInfos{data: data, digits: digits}} = MainSocket.get_chart_last(pid, query) - assert is_number(digits) - assert [elem | _] = data - - assert %Candle{ - symbol: symbol, - open: open, - high: high, - low: low, - close: close, - vol: vol, - ctm: ctm, - ctm_string: ctm_string, - quote_id: quote_id - } = elem - - assert "EURPLN" == symbol - assert is_number(open) - assert is_number(high) - assert is_number(low) - assert is_number(close) - assert is_number(vol) - assert DateTime.compare(ctm, now) == :lt - assert is_binary(ctm_string) - refute quote_id - end - - test "get chart range", %{pid: pid} do - now = DateTime.utc_now() - - args = %{ - range: - DateRange.new(%{ - from: DateTime.add(now, -2 * 30 * 24 * 60 * 60), - to: now - }), - period: :h1, - symbol: "EURPLN" - } - - query = ChartRange.Query.new(args) - - assert {:ok, %RateInfos{data: data, digits: digits}} = - MainSocket.get_chart_range(pid, query) - - assert is_number(digits) - assert [elem | _] = data - - assert %Candle{ - symbol: symbol, - open: open, - high: high, - low: low, - close: close, - vol: vol, - ctm: ctm, - ctm_string: ctm_string, - quote_id: quote_id - } = elem - - assert "EURPLN" == symbol - assert is_number(open) - assert is_number(high) - assert is_number(low) - assert is_number(close) - assert is_number(vol) - assert DateTime.compare(ctm, now) == :lt - assert is_binary(ctm_string) - refute quote_id - end - - test "get commission definition", %{pid: pid} do - args = %{symbol: "EURPLN", volume: 1} - query = SymbolVolume.new(args) - - assert {:ok, %CommissionDefinition{}} = MainSocket.get_commission_def(pid, query) - end - - test "get current user data", %{pid: pid} do - assert {:ok, %UserInfo{}} = MainSocket.get_current_user_data(pid) - end - - test "get margin level", %{pid: pid} do - assert {:ok, %BalanceInfo{}} = MainSocket.get_margin_level(pid) - end - - test "get margin trade", %{pid: pid} do - args = %{symbol: "EURPLN", volume: 1} - query = SymbolVolume.new(args) - - assert {:ok, %MarginTrade{}} = MainSocket.get_margin_trade(pid, query) - end - - test "get news", %{pid: pid} do - args = %{ - from: DateTime.utc_now() |> DateTime.add(-2 * 30 * 24 * 60 * 60), - to: DateTime.utc_now() - } - - query = DateRange.new(args) + test "logs in to account", %{params: params} do + {:ok, pid} = MainSocket.start_link(params) - assert {:ok, %NewsInfos{data: data}} = MainSocket.get_news(pid, query) - assert [elem | _] = data - assert %NewsInfo{} = elem - end + Process.sleep(100) - test "get profit calculation", %{pid: pid} do - args = %{ - open_price: 1.2233, - close_price: 1.3, - operation: :buy, - symbol: "EURPLN", - volume: 1.0 - } - - query = ProfitCalculation.Query.new(args) - - assert {:ok, %ProfitCalculation{}} = MainSocket.get_profit_calculation(pid, query) - end - - test "get server time", %{pid: pid} do - assert {:ok, %ServerTime{}} = MainSocket.get_server_time(pid) - end - - test "get step rules", %{pid: pid} do - assert {:ok, %StepRules{data: data}} = MainSocket.get_step_rules(pid) - assert [elem | _] = data - assert %StepRule{steps: [step | _]} = elem - assert %Step{} = step - end - - test "get symbol", %{pid: pid} do - query = SymbolInfo.Query.new("BHW.PL_9") - - assert {:ok, %SymbolInfo{}} = MainSocket.get_symbol(pid, query) - end - - test "get tick prices", %{pid: pid} do - args = %{ - level: 0, - symbols: ["EURPLN"], - timestamp: DateTime.utc_now() |> DateTime.add(-2 * 60) - } - - query = TickPrices.Query.new(args) - - assert {:ok, %TickPrices{data: data}} = MainSocket.get_tick_prices(pid, query) - assert [elem | _] = data - assert %TickPrice{} = elem - end - - test "get trades history", %{pid: pid} do - args = %{ - from: DateTime.utc_now() |> DateTime.add(-3 * 31 * 24 * 60 * 60), - to: DateTime.utc_now() - } - - query = DateRange.new(args) - - assert {:ok, %TradeInfos{data: data}} = MainSocket.get_trades_history(pid, query) - assert [elem | _] = data - assert %TradeInfo{} = elem - end - - test "get trading hours", %{pid: pid} do - args = ["EURPLN", "AGO.PL_9"] - query = TradingHours.Query.new(args) - - assert {:ok, %TradingHours{data: data}} = MainSocket.get_trading_hours(pid, query) - assert [elem | _] = data - assert %TradingHour{} = elem - assert [qu | _] = elem.quotes - assert [trading | _] = elem.trading - assert %Quote{} = qu - assert %Quote{} = trading - end - - test "get version", %{pid: pid} do - assert {:ok, %Version{}} = MainSocket.get_version(pid) - end + MainSocket.stream_session_id(pid, self()) + assert_receive {:"$gen_cast", {:stream_session_id, _}} end - @default_wait_time 60 * 1000 - - describe "trade transaction with async messages" do - setup %{pid: pid, params: params} do - {:ok, _store} = start_supervised(StreamingTestStoreMock) - - parent_pid = self() - Agent.update(StreamingTestStoreMock, fn _ -> %{parent_pid: parent_pid} end) - - {:ok, stream_session_id} = poll_stream_session_id(pid) - - params = - Map.merge(params, %{stream_session_id: stream_session_id, module: StreamingSocketMock}) - - {:ok, streaming_pid} = StreamingSocket.start_link(params) - assert {:ok, _} = StreamingSocket.subscribe_get_trade_status(streaming_pid) - - :ok - end - - test "trade transaction - open and close transaction", %{pid: pid} do - buy_args = %{ - operation: :buy, - custom_comment: "Buy transaction", - price: 1200.0, - symbol: "LITECOIN", - type: :open, - volume: 1.0 - } - - buy = TradeTransaction.Command.new(buy_args) - - assert {:ok, %TradeTransaction{order: open_order_id}} = - MainSocket.trade_transaction(pid, buy) - - assert_receive {:ok, %TradeStatus{}}, @default_wait_time - - status = TradeTransactionStatus.Query.new(open_order_id) - assert {:ok, %TradeTransactionStatus{}} = MainSocket.trade_transaction_status(pid, status) - - # get all opened only trades - trades_query = Trades.Query.new(true) - assert {:ok, %TradeInfos{data: data}} = MainSocket.get_trades(pid, trades_query) - - position_to_close = - data - |> Enum.find(&(&1.order_closed == open_order_id)) - - close_args = %{ - operation: :buy, - custom_comment: "Close transaction", - price: position_to_close.open_price - 0.01, - symbol: "LITECOIN", - order: position_to_close.order_opened, - type: :close, - volume: 1.0 - } - - close = TradeTransaction.Command.new(close_args) - - assert {:ok, %TradeTransaction{}} = MainSocket.trade_transaction(pid, close) - assert_receive {:ok, %TradeStatus{order: close_order_id}}, @default_wait_time + @tag timeout: 40 * 1000 + test "sends ping after login", %{params: params} do + {:ok, pid} = MainSocket.start_link(params) - status = TradeTransactionStatus.Query.new(close_order_id) + Process.sleep(30 * 1000 + 1) - assert {:ok, %TradeTransactionStatus{status: :accepted}} = - MainSocket.trade_transaction_status(pid, status) - end + assert Process.alive?(pid) == true end end diff --git a/test/xtb_client/streaming_socket_test.exs b/test/xtb_client/streaming_socket_test.exs index bdcc3d2..f6e189e 100644 --- a/test/xtb_client/streaming_socket_test.exs +++ b/test/xtb_client/streaming_socket_test.exs @@ -1,17 +1,10 @@ defmodule XtbClient.StreamingSocketTest do @moduledoc false - use ExUnit.Case + use ExUnit.Case, async: true doctest XtbClient.StreamingSocket - alias XtbClient.Messages alias XtbClient.MainSocket alias XtbClient.StreamingSocket - alias XtbClient.StreamingSocketMock - alias XtbClient.StreamingTestStoreMock - - import XtbClient.MainSocket.E2EFixtures - - @default_wait_time 60 * 1000 setup do Dotenvy.source([ @@ -27,194 +20,28 @@ defmodule XtbClient.StreamingSocketTest do params = %{ url: url, - type: :demo, user: user, password: passwd, + type: :demo, app_name: "XtbClient" } - {:ok, pid} = start_supervised({MainSocket, params}) - {:ok, stream_session_id} = poll_stream_session_id(pid) - - {:ok, - %{ - params: %{ - url: url, - type: type, - stream_session_id: stream_session_id, - module: StreamingSocketMock - }, - main: pid - }} - end - - describe "session management" do - @tag timeout: 40 * 1000 - test "sends ping after login", %{params: params} do - {:ok, pid} = StreamingSocket.start_link(params) + {:ok, mpid} = start_supervised({MainSocket, params}) + Process.sleep(100) + MainSocket.stream_session_id(mpid, self()) - Process.sleep(30 * 1000 + 1) - - assert Process.alive?(pid) == true + receive do + {:"$gen_cast", {:stream_session_id, session_id}} -> + {:ok, %{params: %{url: url, type: type, stream_session_id: session_id}}} end end - describe "public API" do - setup context do - {:ok, pid} = start_supervised({StreamingSocket, context.params}) - - {:ok, _store} = start_supervised(StreamingTestStoreMock) - - parent_pid = self() - Agent.update(StreamingTestStoreMock, fn _ -> %{parent_pid: parent_pid} end) - - {:ok, %{pid: pid}} - end - - test "subscribe to get balance", %{pid: pid, main: main} do - assert {:ok, _} = StreamingSocket.subscribe_get_balance(pid) - - buy_args = %{ - operation: :buy, - custom_comment: "Buy transaction", - price: 1200.0, - symbol: "LITECOIN", - type: :open, - volume: 1.0 - } - - {:ok, %{order: order_id}} = open_trade(main, buy_args) - - assert_receive {:ok, %Messages.BalanceInfo{}}, @default_wait_time - - close_args = %{ - operation: :buy, - custom_comment: "Close transaction", - symbol: "LITECOIN", - type: :close, - volume: 1.0 - } - - {:ok, _} = close_trade(main, order_id, close_args) - - assert_receive {:ok, %Messages.BalanceInfo{}}, @default_wait_time - end - - @tag timeout: @default_wait_time - test "subscribe to get candles", %{pid: pid} do - args = "LITECOIN" - query = Messages.Candles.Query.new(args) - assert {:ok, _} = StreamingSocket.subscribe_get_candles(pid, query) - - assert_receive {:ok, %Messages.Candle{}}, @default_wait_time - end - - test "subscribe to keep alive", %{pid: pid} do - assert {:ok, _} = StreamingSocket.subscribe_keep_alive(pid) - - assert_receive {:ok, %Messages.KeepAlive{}}, @default_wait_time - end - - @tag skip: true - test "subscribe to get news", %{pid: pid} do - assert {:ok, _} = StreamingSocket.subscribe_get_news(pid) + @tag timeout: 40 * 1000 + test "sends ping after login", %{params: params} do + {:ok, pid} = StreamingSocket.start_link(params) - assert_receive {:ok, %Messages.NewsInfo{}}, @default_wait_time - end - - test "subscribe to get profits", %{pid: pid, main: main} do - assert {:ok, _} = StreamingSocket.subscribe_get_profits(pid) - - buy_args = %{ - operation: :buy, - custom_comment: "Buy transaction", - price: 1200.0, - symbol: "LITECOIN", - type: :open, - volume: 1.0 - } - - {:ok, %{order: order_id}} = open_trade(main, buy_args) - - assert_receive {:ok, %Messages.ProfitInfo{}}, @default_wait_time - - close_args = %{ - operation: :buy, - custom_comment: "Close transaction", - symbol: "LITECOIN", - type: :close, - volume: 1.0 - } - - {:ok, _} = close_trade(main, order_id, close_args) - - assert_receive {:ok, %Messages.ProfitInfo{}}, @default_wait_time - end - - test "subscribe to get tick prices", %{pid: pid} do - args = %{symbol: "LITECOIN"} - query = Messages.Quotations.Query.new(args) - assert {:ok, _} = StreamingSocket.subscribe_get_tick_prices(pid, query) - - assert_receive {:ok, %Messages.TickPrice{}}, @default_wait_time - end - - test "subscribe to get trades", %{pid: pid, main: main} do - assert {:ok, _} = StreamingSocket.subscribe_get_trades(pid) - - buy_args = %{ - operation: :buy, - custom_comment: "Buy transaction", - price: 1200.0, - symbol: "LITECOIN", - type: :open, - volume: 1.0 - } + Process.sleep(30 * 1000 + 1) - {:ok, %{order: order_id}} = open_trade(main, buy_args) - - assert_receive {:ok, %Messages.TradeInfo{}}, @default_wait_time - - close_args = %{ - operation: :buy, - custom_comment: "Close transaction", - symbol: "LITECOIN", - type: :close, - volume: 1.0 - } - - {:ok, _} = close_trade(main, order_id, close_args) - - assert_receive {:ok, %Messages.TradeInfo{}}, @default_wait_time - end - - test "subscribe to trade status", %{pid: pid, main: main} do - assert {:ok, _} = StreamingSocket.subscribe_get_trade_status(pid) - - buy_args = %{ - operation: :buy, - custom_comment: "Buy transaction", - price: 1200.0, - symbol: "LITECOIN", - type: :open, - volume: 1.0 - } - - {:ok, %{order: order_id}} = open_trade(main, buy_args) - - assert_receive {:ok, %Messages.TradeStatus{}}, @default_wait_time - - close_args = %{ - operation: :buy, - custom_comment: "Close transaction", - symbol: "LITECOIN", - type: :close, - volume: 1.0 - } - - {:ok, _} = close_trade(main, order_id, close_args) - - assert_receive {:ok, %Messages.TradeStatus{}}, @default_wait_time - end + assert Process.alive?(pid) == true end end