Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic buy bot #526

Open
wants to merge 10 commits into
base: data.ticktools
Choose a base branch
from
263 changes: 263 additions & 0 deletions examples/basic_order_bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
# from pprint import pformat
from functools import partial
from decimal import Decimal
from typing import Callable

import tractor
import trio
from uuid import uuid4

from piker.service import maybe_open_pikerd
from piker.accounting import dec_digits
from piker.clearing import (
open_ems,
OrderClient,
)
# TODO: we should probably expose these top level in this subsys?
from piker.clearing._messages import (
Order,
Status,
BrokerdPosition,
)
from piker.data import (
iterticks,
Flume,
open_feed,
Feed,
# ShmArray,
)


# TODO: handle other statuses:
# - fills, errors, and position tracking
async def wait_for_order_status(
trades_stream: tractor.MsgStream,
oid: str,
expect_status: str,

) -> tuple[
list[Status],
list[BrokerdPosition],
]:
'''
Wait for a specific order status for a given dialog, return msg flow
up to that msg and any position update msgs in a tuple.

'''
# Wait for position message before moving on to verify flow(s)
# for the multi-order position entry/exit.
status_msgs: list[Status] = []
pp_msgs: list[BrokerdPosition] = []

async for msg in trades_stream:
match msg:
case {'name': 'position'}:
ppmsg = BrokerdPosition(**msg)
pp_msgs.append(ppmsg)

case {
'name': 'status',
}:
msg = Status(**msg)
status_msgs.append(msg)

# if we get the status we expect then return all
# collected msgs from the brokerd dialog up to the
# exected msg B)
if (
msg.resp == expect_status
and msg.oid == oid
):
return status_msgs, pp_msgs


async def bot_main():
'''
Boot the piker runtime, open an ems connection, submit
and process orders statuses in real-time.

'''
ll: str = 'info'

# open an order ctl client, live data feed, trio nursery for
# spawning an order trailer task
client: OrderClient
trades_stream: tractor.MsgStream
feed: Feed
accounts: list[str]

fqme: str = 'btcusdt.usdtm.perp.binance'

async with (

# TODO: do this implicitly inside `open_ems()` ep below?
# init and sync actor-service runtime
maybe_open_pikerd(
loglevel=ll,
debug_mode=True,

),
open_ems(
fqme,
mode='paper', # {'live', 'paper'}
# mode='live', # for real-brokerd submissions
loglevel=ll,

) as (
client, # OrderClient
trades_stream, # tractor.MsgStream startup_pps,
_, # positions
accounts,
_, # dialogs
),

open_feed(
fqmes=[fqme],
loglevel=ll,

# TODO: if you want to throttle via downsampling
# how many tick updates your feed received on
# quote streams B)
# tick_throttle=10,
) as feed,

trio.open_nursery() as tn,
):
assert accounts
print(f'Loaded binance accounts: {accounts}')

flume: Flume = feed.flumes[fqme]
min_tick = Decimal(flume.mkt.price_tick)
min_tick_digits: int = dec_digits(min_tick)
price_round: Callable = partial(
round,
ndigits=min_tick_digits,
)

quote_stream: trio.abc.ReceiveChannel = feed.streams['binance']


# always keep live limit 0.003% below last
# clearing price
clear_margin: float = 0.9997

async def trailer(
order: Order,
):
# ref shm OHLCV array history, if you want
# s_shm: ShmArray = flume.rt_shm
# m_shm: ShmArray = flume.hist_shm

# NOTE: if you wanted to frame ticks by type like the
# the quote throttler does.. and this is probably
# faster in terms of getting the latest tick type
# embedded value of interest?
# from piker.data._sampling import frame_ticks

async for quotes in quote_stream:
for fqme, quote in quotes.items():
# print(
# f'{quote["symbol"]} -> {quote["ticks"]}\n'
# f'last 1s OHLC:\n{s_shm.array[-1]}\n'
# f'last 1m OHLC:\n{m_shm.array[-1]}\n'
# )

for tick in iterticks(
quote,
reverse=True,
# types=('trade', 'dark_trade'), # defaults
):

await client.update(
uuid=order.oid,
price=price_round(
clear_margin
*
tick['price']
),
)
msgs, pps = await wait_for_order_status(
trades_stream,
order.oid,
'open'
)
# if multiple clears per quote just
# skip to the next quote?
break


# get first live quote to be sure we submit the initial
# live buy limit low enough that it doesn't clear due to
# a stale initial price from the data feed layer!
first_ask_price: float | None = None
async for quotes in quote_stream:
for fqme, quote in quotes.items():
# print(quote['symbol'])
for tick in iterticks(quote, types=('ask')):
first_ask_price: float = tick['price']
break

if first_ask_price:
break

# setup order dialog via first msg
price: float = price_round(
clear_margin
*
first_ask_price,
)

# compute a 1k USD sized pos
size: float = round(1e3/price, ndigits=3)

order = Order(

# docs on how this all works, bc even i'm not entirely
# clear XD. also we probably want to figure out how to
# offer both the paper engine running and the brokerd
# order ctl tasks with the ems choosing which stream to
# route msgs on given the account value!
account='paper', # use built-in paper clearing engine and .accounting
# account='binance.usdtm', # for live binance futes

oid=str(uuid4()),
exec_mode='live', # {'dark', 'live', 'alert'}

action='buy', # TODO: remove this from our schema?

size=size,
symbol=fqme,
price=price,
brokers=['binance'],
)
await client.send(order)

msgs, pps = await wait_for_order_status(
trades_stream,
order.oid,
'open',
)

assert not pps
assert msgs[-1].oid == order.oid

# start "trailer task" which tracks rt quote stream
tn.start_soon(trailer, order)

try:
# wait for ctl-c from user..
await trio.sleep_forever()
except KeyboardInterrupt:
# cancel the open order
await client.cancel(order.oid)

msgs, pps = await wait_for_order_status(
trades_stream,
order.oid,
'canceled'
)
raise


if __name__ == '__main__':
trio.run(bot_main)
1 change: 1 addition & 0 deletions piker/brokers/binance/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ async def _api(

# Check if we're configured to route order requests to the
# venue equivalent's testnet.
use_testnet: bool = False
section_name: str = self.venue2configkey[venue_key]
if subconf := self.conf.get(section_name):
use_testnet = (
Expand Down
2 changes: 1 addition & 1 deletion piker/brokers/binance/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ async def open_trade_dialog(
wss: NoBsWs
async with (
client.manage_listen_key() as listen_key,
open_autorecon_ws(f'{wss_url}/ws/{listen_key}') as wss,
open_autorecon_ws(f'{wss_url}/?listenKey={listen_key}') as wss,
):
nsid: int = time_ns()
await wss.send_msg({
Expand Down
6 changes: 3 additions & 3 deletions piker/brokers/binance/venues.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
# 'wss://ws-api.binance.com:443/ws-api/v3',

# https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams
_futes_ws: str = f'wss://fstream.{_domain}/ws/'
_auth_futes_ws: str = 'wss://fstream-auth.{_domain}/ws/'
_futes_ws: str = f'wss://fstream.{_domain}/ws'
_auth_futes_ws: str = 'wss://fstream-auth.{_domain}/ws'

# test nets
# NOTE: spot test network only allows certain ep sets:
Expand All @@ -58,7 +58,7 @@
# 'wss://testnet.binance.vision/ws-api/v3'

_testnet_futes_url: str = 'https://testnet.binancefuture.com'
_testnet_futes_ws: str = 'wss://stream.binancefuture.com'
_testnet_futes_ws: str = 'wss://stream.binancefuture.com/ws'


MarketType = Literal[
Expand Down