Skip to content

Commit

Permalink
Websocket (re-)connect more robust. Strategys pick up after last sell…
Browse files Browse the repository at this point in the history
… order properly now. And more tiny fixes.
  • Loading branch information
aLca committed Feb 20, 2025
1 parent f29d7ac commit decd91e
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,23 @@ def on_open(self, ws):

def start_socket(self):
def run_socket():
print("Starting WebSocket connection...")
self.websocket = websocket.WebSocketApp(self.ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open)
self.websocket.run_forever(reconnect=5)

import time
while True:
try:
print("Starting WebSocket connection...")
self.websocket = websocket.WebSocketApp(
self.ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)

self.websocket.run_forever(ping_interval=10, ping_timeout=3)
except Exception as e:
print(f"WebSocket encountered an exception: {e}")
print("WebSocket disconnected. Reconnecting in 3 seconds...")
time.sleep(3)
self.websocket_thread = threading.Thread(target=run_socket, daemon=True)
self.websocket_thread.start()

Expand All @@ -77,9 +86,12 @@ def stop_socket(self):
print("WebSocket connection closed.")

def fetch_ohlcv(self, symbol, interval, since=None, until=None):
import time

print('STORE::FETCH SINCE:', since)
start_timestamp = since
data = []
max_retries = 5

while True:
params = {
Expand All @@ -94,9 +106,24 @@ def fetch_ohlcv(self, symbol, interval, since=None, until=None):
if until:
params['endTime'] = until

url = f"https://api.binance.com/api/v3/klines"
response = requests.get(url, params=params)
new_data = response.json()
url = "https://api.binance.com/api/v3/klines"

retries = 0
new_data = None
while retries < max_retries:
try:
response = requests.get(url, params=params, timeout=10)
new_data = response.json()
break # exit retry loop if successful
except (requests.exceptions.RequestException, requests.exceptions.JSONDecodeError) as e:
wait_time = 2 ** retries # exponential backoff
print(f"Error fetching data (attempt {retries + 1}/{max_retries}): {e}. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
retries += 1

if retries == max_retries:
print("Max retries reached. Exiting fetch.")
break

if not new_data or len(new_data) == 0:
break
Expand All @@ -107,8 +134,8 @@ def fetch_ohlcv(self, symbol, interval, since=None, until=None):
start_timestamp = new_data[-1][0] + 1

if data:
start_time = datetime.fromtimestamp(data[0][0]/1000, tz=pytz.UTC)
end_time = datetime.fromtimestamp(data[-1][0]/1000, tz=pytz.UTC)
start_time = datetime.fromtimestamp(data[0][0] / 1000, tz=pytz.UTC)
end_time = datetime.fromtimestamp(data[-1][0] / 1000, tz=pytz.UTC)
print(f"Fetched data from {start_time} to {end_time}")

return data
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from fastquant.strategies.custom_indicators.ChaikinVolatility import ChaikinVolatility

'''
It was actually planned as a “keep it simple stupid” proof of concept. But as things happen, it totally escalated once again.
It was actually planned as a “keep it simple stupid” proof of concept. but as things happen, it totally escalated once again.
But anyway, I can pull more rabbits out of my hat - so I've decided to make this available to everyone.
Sharing is caring.
Expand Down
15 changes: 7 additions & 8 deletions dependencies/fastquant/strategies/Order_Chain_Kioseff_Trading.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,19 @@ def get_level_index(self, price):

class Order_Chain_Kioseff_Trading(BaseStrategy):
params = (
('chain', 150),
('ticks', 50),
('chain', 1500),
('ticks', 150),
('tick_size', 0.00010),
('signal_threshold', 100), # 1000 threshold for generating signals
('signal_threshold', 1000), # 1000 threshold for generating signals
('debug', False),
('backtest', None),
("dca_deviation", 1),
("take_profit_percent", 1),
("dca_deviation", 2.5),
("take_profit_percent", 2),
('percent_sizer', 0.001), # 0.01 -> 1%
)

def __init__(self):
super().__init__()
BuySellArrows(self.data0, barplot=True)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.order_chain = OrderChainIndicator(chain=self.p.chain, ticks=self.p.ticks, tick_size=self.p.tick_size)

self.buy_executed = False
Expand Down
3 changes: 2 additions & 1 deletion dependencies/fastquant/strategies/QQE_Hullband_VolumeOsc.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class QQEIndicator(bt.Indicator):
)
lines = ("qqe_line",)

def __init__(self):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.rsi = bt.indicators.RSI(self.data.close, period=self.p.period)
self.atr = bt.indicators.ATR(self.data, period=self.p.fast)
self.dar = bt.If(self.atr > 0, bt.indicators.EMA(self.atr - self.p.q, period=int((self.p.period * 2) - 1)), 0)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import backtrader as bt
from fastquant.strategies.base import BaseStrategy, BuySellArrows
from fastquant.strategies.base import BaseStrategy
from fastquant.strategies.custom_indicators.MesaAdaptiveMovingAverage import MAMA

class SMA_Cross_MESAdaptivePrime(BaseStrategy, bt.SignalStrategy):
Expand All @@ -14,7 +14,6 @@ class SMA_Cross_MESAdaptivePrime(BaseStrategy, bt.SignalStrategy):
)

def __init__(self, **kwargs):
BuySellArrows(self.data0, barplot=True)
super().__init__(**kwargs)
# Simple Moving Averages
self.sma17 = bt.ind.SMA(period=17)
Expand Down
129 changes: 58 additions & 71 deletions dependencies/fastquant/strategies/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ def next(self):
super().next()

if self.lines.buy[0]:
self.lines.buy[0] -= self.data.low[0] * 0.2
self.lines.buy[0] -= self.data.low[0] * 0.02

if self.lines.sell[0]:
self.lines.sell[0] += self.data.high[0] * 0.2
self.lines.sell[0] += self.data.high[0] * 0.02

plotlines = dict(
buy=dict(marker='$\u21E7$', markersize=12.0),
sell=dict(marker='$\u21E9$', markersize=12.0)
buy=dict(marker='$\u21E7$', markersize=16.0),
sell=dict(marker='$\u21E9$', markersize=16.0)
)

class BaseStrategy(bt.Strategy):
Expand All @@ -45,7 +45,7 @@ class BaseStrategy(bt.Strategy):
('amount', None),
('coin', None),
('collateral', None),
('debug', False),
('debug', True),
('backtest', None),
('is_training', None),
('use_stoploss', None),
Expand Down Expand Up @@ -85,7 +85,6 @@ def init_live_trading(self):
else:
self._init_standard_exchange()


def _init_alert_system(self, coin_name=".__!_"):
"""Initialize alert system with Telegram and Discord services if enabled"""
if not self.p.enable_alerts:
Expand Down Expand Up @@ -160,23 +159,6 @@ def send_alert(self, message: str):
print('Alert System not enabled.')
pass

def _init_standard_exchange(self):
"""Initialize standard exchange trading with JackRabbitRelay"""
alert_manager = self._init_alert_system()

# Wait briefly for alert system initialization
time.sleep(1)

self.exchange = self.p.exchange
self.account = self.p.account
self.asset = self.p.asset
self.rabbit = JrrOrderBase(alert_manager=alert_manager)

self.order_queue = queue.Queue()
self.order_thread = threading.Thread(target=self.process_orders)
self.order_thread.daemon = True
self.order_thread.start()

def _init_pancakeswap(self):
"""Initialize PancakeSwap trading"""
self.pcswap = _web3order(coin=self.p.coin, collateral=self.p.collateral)
Expand Down Expand Up @@ -289,9 +271,10 @@ def __init__(self, **kwargs):


def log(self, txt, dt=None):
if len(self.datas) == 0 or len(self.datas[0]) == 0:
print("No data available yet, skipping log entry.")
return
if self.p.backtest == False:
if len(self.datas) == 0 or len(self.datas[0]) == 0:
print("No data available yet, skipping log entry.")
return
dt = dt or self.datas[0].datetime.datetime(0)
print("%s, %s" % (dt.isoformat(), txt))

Expand Down Expand Up @@ -393,31 +376,26 @@ def load_trade_data(self):
try:
file_path = f"/home/JackrabbitRelay2/Data/Mimic/{self.account}.history"
if sys.platform != "win32":
os.sync() # Sync before reading attempt - it might being open/write from JRR
os.sync() # Ensure file is in sync before reading
with open(file_path, 'r') as file:
orders = file.read().strip().split('\n')
orders.reverse()

found_sell = False
for order in orders:
if not order.strip(): # Skip empty strings
continue
orders = [line for line in file.read().strip().split('\n') if line.strip()]

# Process orders starting from the most recent
for order_str in reversed(orders):
try:
order_data = json.loads(order)
order_data = json.loads(order_str)
except json.JSONDecodeError:
print(f"Skipping invalid JSON: {order}")
print(f"Skipping invalid JSON: {order_str}")
continue

action = order_data.get('Action')
asset = order_data.get('Asset')

# Stop if a sell is encountered – meaning any prior buys are irrelevant
if action == 'sell' and asset == self.asset:
found_sell = True
continue
break

if not found_sell and action == 'buy' and asset == self.asset:
# _amount = order_data.get(self.p.coin, 0.0)
if action == 'buy' and asset == self.asset:
entry_price = order_data.get('Price', 0.0)
self.entry_prices.append(entry_price)
self.sizes.append(self.p.amount)
Expand All @@ -426,45 +404,41 @@ def load_trade_data(self):
if self.entry_prices and self.sizes:
print(f"Loaded {len(self.entry_prices)} buy orders after the last sell.")
self.calc_averages()
self.buy_executed = True
self.entry_price = self.average_entry_price
else:
if found_sell:
print("No buy orders found after the last sell.")
else:
print("No executed sell orders found.")

if orders and orders[0].strip():
print("No buy orders found after the last sell.")

# Process the last order for free USDT etc.
if orders and orders[-1].strip():
try:
last_order_data = json.loads(orders[0])
last_order_data = json.loads(orders[-1])
usdt_value = last_order_data.get('USDT', 0.0)
print(f"Free USDT: {usdt_value:.9f}")
self.stake_to_use = usdt_value
print(f"Last modified: {os.path.getmtime(file_path)}")
except json.JSONDecodeError:
print("Error parsing the last order, resetting position state.")
self.stake_to_use = 1000.0 # new Default :<
self.stake_to_use = 1000.0
self.reset_position_state()
else:
self.reset_position_state()

# TODO :: Figure out why still doesnt set an default when account history file is empty
except FileNotFoundError:
print(f"History file not found for account {self.account}.")
self.reset_position_state()
self.stake_to_use = 1000.0 # Default stake when file is not found
self.stake_to_use = 1000.0
except PermissionError:
print(f"Permission denied when trying to access the history file for account {self.account}.")
self.reset_position_state()
self.stake_to_use = 1000.0 # Default stake when permission is denied
self.stake_to_use = 1000.0
except requests.exceptions.RequestException as e:
print(f"Error fetching trade data: {e}")
self.reset_position_state()
self.stake_to_use = 1000.0 # Default stake when there's a request exception
self.stake_to_use = 1000.0
except Exception as e:
print(f"Unexpected error occurred while loading trade data: {e}")
self.reset_position_state()
self.stake_to_use = 1000.0 # Default stake for any other unexpected errors
self.stake_to_use = 1000.0

def calc_averages(self):
total_value = sum(entry_price * size for entry_price, size in zip(self.entry_prices, self.sizes))
Expand Down Expand Up @@ -495,23 +469,36 @@ def start(self):

def next(self):
self.conditions_checked = False
if self.params.backtest == False:
if self.live_data == True:
self.stake = self.stake_to_use * self.p.percent_sizer / self.dataclose # TODO figure out why no default stake is set on empty history
if self.params.backtest == False and self.live_data == True:
# Ensure we have live data and update the stake if so
if not self.params.backtest and getattr(self, 'live_data', False):
self.stake = self.stake_to_use * self.p.percent_sizer / self.dataclose

# Debug: Print current state
if self.p.debug:
print(f"DEBUG: live_data={getattr(self, 'live_data', False)}, buy_executed={self.buy_executed}, DCA={self.DCA}, print_counter={self.print_counter}")

# If we already have a buy, update and print the position report every 10th call
if self.buy_executed and self.p.debug:
self.print_counter += 1
if self.print_counter % 10 == 0:
print(f'| {datetime.utcnow()}'
f'\n|{"-"*99}¬'
f'\n| Position Report'
f'\n| Price: {self.data.close[0]:.9f}'
f'\n| Entry: {self.average_entry_price:.9f}'
f'\n| TakeProfit: {self.take_profit_price:.9f}'
f'\n|{"-"*99}¬')

if not self.buy_executed:
self.buy_or_short_condition()
elif self.DCA == True and self.buy_executed:
self.sell_or_cover_condition()
self.dca_or_short_condition()
elif self.DCA == False and self.buy_executed:
self.sell_or_cover_condition()

if self.live_data == True and self.buy_executed and self.p.debug:
self.print_counter += 1
if self.print_counter % 1 == 60: # reduce logging spam
print(f'| {datetime.utcnow()}\n|{"-"*99}¬\n| Position Report\n| Price: {self.data.close[0]:.9f}\n| Entry: {self.average_entry_price:.9f}\n| TakeProfit: {self.take_profit_price:.9f}\n|{"-"*99}¬')

if not self.buy_executed:
self.buy_or_short_condition()
elif self.DCA == True and self.buy_executed:
self.sell_or_cover_condition()
self.dca_or_short_condition()
elif self.DCA == False and self.buy_executed:
self.sell_or_cover_condition()


elif self.params.backtest == True:
self.stake = self.broker.getcash() * self.p.percent_sizer / self.dataclose
if not self.buy_executed:
Expand Down Expand Up @@ -622,7 +609,7 @@ def stop(self):
print("+-------------------------------------+-----------------+-------------+-----------+------------+----------------+--------------+------------------+--------------+------+--------+")
print("| {0:<35} | {1:<15} | {2:<11} | {3:<9} | {4:<10} | {5:<14} | {6:<12} | {7:<16} | {8:<12} | {9:<4} | {10:<6} |".format(
self.__class__.__name__,
1000, # initial capital (example)
1000,
round(self.final_value, 2),
round(self.total_pnl, 2),
round(self.total_pnl / 1000 * 100, 2), # Return in percentage
Expand Down
Loading

0 comments on commit decd91e

Please sign in to comment.