From abc3e8655556ba9e1ac046636403dc8a204d6ad9 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 18 Feb 2017 16:01:06 +1100 Subject: [PATCH 01/35] Started an IB PriceHandler and a test case. --- qstrader/price_handler/ib_bar.py | 75 ++++++++++++++++++++++++++++++++ tests/test_ib_price_handler.py | 28 ++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 qstrader/price_handler/ib_bar.py create mode 100644 tests/test_ib_price_handler.py diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py new file mode 100644 index 00000000..977bc28a --- /dev/null +++ b/qstrader/price_handler/ib_bar.py @@ -0,0 +1,75 @@ +import os +import datetime +from .base import AbstractBarPriceHandler + +from ibapi.client import EClient +from ibapi.wrapper import EWrapper +from ibapi.contract import * + +import logging + +class IBClient(EClient): + def __init__(self, wrapper): + EClient.__init__(self, wrapper) + + +class IBWrapper(EWrapper): + def __init__(self): + EWrapper.__init__(self) + + def historicalData(self, reqId:int, date:str, open:float, high:float, + low: float, close:float, volume: int, barCount: int, + WAP:float, hasGaps: int): + logging.error("RECEIVED HISTORICAL DATA BAR.") + + +class IBBarPriceHandler(AbstractBarPriceHandler, IBWrapper, IBClient): + """ + Designed to feed either live or historic market data bars + from an Interactive Brokers connection. + + Each "IB-connected" module requires its 'client' and 'wrapper' methods. + + TODO: + * Historic/Live mode to be set by whether QSTrader is in Backtest or Live mode + """ + def __init__( + self, events_queue, tickers, settings, mode="historic", + hist_end_date = datetime.datetime.now() - datetime.timedelta(days=3), + hist_duration="1 D", hist_barsize="1 min" + ): + logging.basicConfig( level=logging.ERROR ) + + self.ib_wrapper = IBWrapper() + self.ib_client = IBClient(self.ib_wrapper) + + self.ib_client.connect("127.0.0.1",4001,0) + + self.tickers = {} + self.mode = mode + self.continue_backtest = True + self.hist_end_date = hist_end_date + self.hist_duration = hist_duration + self.hist_barsize = hist_barsize + + for ticker in tickers: + self._subscribe_ticker(ticker) + + import time + time.sleep(10) + + + def _subscribe_ticker(self, ticker): + if ticker not in self.tickers: + # Set up an IB ContractS + contract = Contract() + contract.exchange = "SMART" + contract.symbol = ticker + contract.secType = "STK" + contract.currency = "USD" + + if self.mode == "historic": + end_time = datetime.datetime.strftime(self.hist_end_date, "%Y%m%d 17:00:00") + self.ib_client.reqHistoricalData( + 0, contract, end_time, self.hist_duration, self.hist_barsize, + "TRADES", True, 2, None) diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py new file mode 100644 index 00000000..2443345f --- /dev/null +++ b/tests/test_ib_price_handler.py @@ -0,0 +1,28 @@ +import unittest + +from qstrader.price_handler.ib_bar import IBBarPriceHandler +from qstrader.compat import queue +from qstrader import settings + + +class TestPriceHandlerSimpleCase(unittest.TestCase): + def setUp(self): + """ + Set up the PriceHandler object with a small + set of initial tickers for a backtest in historic mode. + """ + self.config = settings.TEST + fixtures_path = self.config.CSV_DATA_DIR + events_queue = queue.Queue() + init_tickers = ["FB"] + self.price_handler = IBBarPriceHandler( + events_queue, init_tickers, self.config + ) + + def test(self): + self.assertEqual(1,2) + + + +if __name__ == "__main__": + unittest.main() From 55df528a81c016aed440ad5b1dab08e2694241a1 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 25 Feb 2017 17:50:59 +1100 Subject: [PATCH 02/35] Start working on IBService; the master/background/network communication manager to IB --- qstrader/services/ib.py | 137 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 qstrader/services/ib.py diff --git a/qstrader/services/ib.py b/qstrader/services/ib.py new file mode 100644 index 00000000..cab48e6a --- /dev/null +++ b/qstrader/services/ib.py @@ -0,0 +1,137 @@ +import sys +import argparse +import datetime +import collections +import inspect +import queue + +import logging +import time +import os.path + +from ibapi.wrapper import wrapper +from ibapi.client import EClient +from ibapi.utils import iswrapper + +#types +from ibapi.utils import (current_fn_name, BadMessage) +from ibapi.common import * +from ibapi.order_condition import * +from ibapi.contract import * +from ibapi.order import * +from ibapi.order_state import * +from ibapi.execution import Execution +from ibapi.execution import ExecutionFilter +from ibapi.commission_report import CommissionReport +from ibapi.scanner import ScannerSubscription +from ibapi.ticktype import * + +from ibapi.account_summary_tags import * + +from ContractSamples import ContractSamples +from OrderSamples import OrderSamples +from AvailableAlgoParams import AvailableAlgoParams +from ScannerSubscriptionSamples import ScannerSubscriptionSamples +from FaAllocationSamples import FaAllocationSamples + + +class IBService(IBWrapper, IBClient): + """ + The IBService is the primary conduit of data from QStrader to Interactive Brokers. + This service provides functions to request data, and allows for + callbacks to be triggered, which populates "data queues" with the response. + + All methods of the EClient are available (i.e. API Requests), as are + the callbacks for EWrapper (i.e. API responses). It also provides a set of Queues + which are populated with the responses from EWrapper. Other components in the + system should use these queues collect the API response data. + + Any module or component that wishes to interact with IB should do so by using + methods offered in this class. This ensures that the logic required to talk with IB + is contained within this class exclusively, with the added benefit that we + can easily create mock instances of the IBService for testing. + """ + def __init__(self): + IBWrapper.__init__(self) + IBClient.__init__(self, wrapper=self) + + self.historicalDataQueue = queue.Queue() + self.waitingHistoricalData = [] + + # Connect to IB and make the historic data request. + # self.connect("127.0.0.1", 4001, clientId=0) + # self.historicalDataRequests_req() + # + # while (self.conn.isConnected() or not self.msg_queue.empty()) and len(self.waitingHistoricalData) != 0: + # try: + # text = self.msg_queue.get(block=True, timeout=0.2) + # if len(text) > MAX_MSG_LEN: + # self.wrapper.error(NO_VALID_ID, BAD_LENGTH.code(), + # "%s:%d:%s" % (BAD_LENGTH.msg(), len(text), text)) + # self.disconnect() + # break + # except queue.Empty: + # logging.debug("queue.get: empty") + # else: + # fields = comm.read_fields(text) + # logging.debug("fields %s", fields) + # self.decoder.interpret(fields) + # + # # print("conn:%d queue.sz:%d", + # # self.conn.isConnected(), + # # self.msg_queue.qsize()) + # + # self.disconnect() + + + def error(self, reqId:TickerId, errorCode:int, errorString:str): + super().error(reqId, errorCode, errorString) + print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString) + + + """ + Append `reqId` to waitingHistoricalData, then call the super method. + """ + def reqHistoricalData(self, reqId:TickerId , contract:Contract, endDateTime:str, + durationStr:str, barSizeSetting:str, whatToShow:str, + useRTH:int, formatDate:int, chartOptions:TagValueList): + self.waitingHistoricalData.append(reqId) + print("REQUESTING HISTORIC, WAITING FOR %s" % len(self.waitingHistoricalData)) + super().reqHistoricalData( reqId, contract, endDateTime, + durationStr, barSizeSetting, whatToShow, + useRTH, formatDate, chartOptions) + + + """ + Creates a historical data request for CBA. + """ + def historicalDataRequests_req(self): + contract = Contract() + contract.exchange = "SMART" + contract.symbol = "CBA" + contract.secType = "STK" + contract.currency = "AUD" + + queryTime = (datetime.datetime.today() - + datetime.timedelta(days=180)).strftime("%Y%m%d %H:%M:%S") + self.reqHistoricalData(4002, contract, queryTime, + "10 D", "1 min", "TRADES", 1, 1, None) + + + """ + Populate the HistoricalData queue. + """ + def historicalData(self, reqId:TickerId , date:str, open:float, high:float, + low:float, close:float, volume:int, barCount:int, + WAP:float, hasGaps:int): + print("RECEIVED HISTORIC DATA") + self.historicalDataQueue.put((reqId, date, open, high, low, close, + volume, barCount, WAP, hasGaps)) + + """ + Remove `reqId` from waitingHistoricalData + TODO: Will it work with multiple historical requests for same symbol? + """ + def historicalDataEnd(self, reqId:int, start:str, end:str): + print("FINISHED FOR %s" % reqId) + self.waitingHistoricalData.remove(reqId) From 63e1710e140992bdf09cbbc2b0ca2c88c53f3e61 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 25 Feb 2017 17:54:18 +1100 Subject: [PATCH 03/35] Remove prebuilt hist data request --- qstrader/services/ib.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/qstrader/services/ib.py b/qstrader/services/ib.py index cab48e6a..54e240b4 100644 --- a/qstrader/services/ib.py +++ b/qstrader/services/ib.py @@ -102,22 +102,6 @@ def reqHistoricalData(self, reqId:TickerId , contract:Contract, endDateTime:str, useRTH, formatDate, chartOptions) - """ - Creates a historical data request for CBA. - """ - def historicalDataRequests_req(self): - contract = Contract() - contract.exchange = "SMART" - contract.symbol = "CBA" - contract.secType = "STK" - contract.currency = "AUD" - - queryTime = (datetime.datetime.today() - - datetime.timedelta(days=180)).strftime("%Y%m%d %H:%M:%S") - self.reqHistoricalData(4002, contract, queryTime, - "10 D", "1 min", "TRADES", 1, 1, None) - - """ Populate the HistoricalData queue. """ From e6731856a585daee7a0c94d97dd854a090f84c59 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 25 Feb 2017 18:00:51 +1100 Subject: [PATCH 04/35] Rename references to EWrapper/EClient --- qstrader/{services => service}/ib.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) rename qstrader/{services => service}/ib.py (91%) diff --git a/qstrader/services/ib.py b/qstrader/service/ib.py similarity index 91% rename from qstrader/services/ib.py rename to qstrader/service/ib.py index 54e240b4..fa1d3e7f 100644 --- a/qstrader/services/ib.py +++ b/qstrader/service/ib.py @@ -9,7 +9,7 @@ import time import os.path -from ibapi.wrapper import wrapper +from ibapi.wrapper import EWrapper from ibapi.client import EClient from ibapi.utils import iswrapper @@ -28,14 +28,8 @@ from ibapi.account_summary_tags import * -from ContractSamples import ContractSamples -from OrderSamples import OrderSamples -from AvailableAlgoParams import AvailableAlgoParams -from ScannerSubscriptionSamples import ScannerSubscriptionSamples -from FaAllocationSamples import FaAllocationSamples - -class IBService(IBWrapper, IBClient): +class IBService(EWrapper, EClient): """ The IBService is the primary conduit of data from QStrader to Interactive Brokers. This service provides functions to request data, and allows for @@ -52,8 +46,8 @@ class IBService(IBWrapper, IBClient): can easily create mock instances of the IBService for testing. """ def __init__(self): - IBWrapper.__init__(self) - IBClient.__init__(self, wrapper=self) + EWrapper.__init__(self) + EClient.__init__(self, wrapper=self) self.historicalDataQueue = queue.Queue() self.waitingHistoricalData = [] From c46d051bdbc58577102845e7ef075807b5f95aa0 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 25 Feb 2017 18:13:04 +1100 Subject: [PATCH 05/35] Added stub test, partially implement historic prices and ib_bar price handler --- qstrader/price_handler/ib_bar.py | 100 +++++++++++++++++++++---------- qstrader/service/ib.py | 29 +-------- tests/test_ib_price_handler.py | 3 +- 3 files changed, 74 insertions(+), 58 deletions(-) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index 977bc28a..ff3080ed 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -1,75 +1,115 @@ import os import datetime +import queue from .base import AbstractBarPriceHandler +from qstrader.service.ib import IBService + from ibapi.client import EClient from ibapi.wrapper import EWrapper +from ibapi.utils import iswrapper from ibapi.contract import * +from ibapi.common import * -import logging - -class IBClient(EClient): - def __init__(self, wrapper): - EClient.__init__(self, wrapper) -class IBWrapper(EWrapper): - def __init__(self): - EWrapper.__init__(self) +#types +from ibapi.utils import (current_fn_name, BadMessage) +from ibapi.common import * +from ibapi.order_condition import * +from ibapi.contract import * +from ibapi.order import * +from ibapi.order_state import * +from ibapi.execution import Execution +from ibapi.execution import ExecutionFilter +from ibapi.commission_report import CommissionReport +from ibapi.scanner import ScannerSubscription +from ibapi.ticktype import * - def historicalData(self, reqId:int, date:str, open:float, high:float, - low: float, close:float, volume: int, barCount: int, - WAP:float, hasGaps: int): - logging.error("RECEIVED HISTORICAL DATA BAR.") +from ibapi.account_summary_tags import * -class IBBarPriceHandler(AbstractBarPriceHandler, IBWrapper, IBClient): +class IBBarPriceHandler(AbstractBarPriceHandler): """ Designed to feed either live or historic market data bars from an Interactive Brokers connection. - Each "IB-connected" module requires its 'client' and 'wrapper' methods. + Uses the IBService to make requests and collect data once responses have returned. TODO: * Historic/Live mode to be set by whether QSTrader is in Backtest or Live mode + * IBService should be an initialization parameter + * Ports, etc, connection strings from config """ def __init__( - self, events_queue, tickers, settings, mode="historic", + self, events_queue, param_tickers, settings, mode="historic", hist_end_date = datetime.datetime.now() - datetime.timedelta(days=3), - hist_duration="1 D", hist_barsize="1 min" + hist_duration="5 D", hist_barsize="1 min" ): - logging.basicConfig( level=logging.ERROR ) - - self.ib_wrapper = IBWrapper() - self.ib_client = IBClient(self.ib_wrapper) - - self.ib_client.connect("127.0.0.1",4001,0) - - self.tickers = {} + self.ib_service = IBService() + self.ib_service.connect("127.0.0.1",4001,0) self.mode = mode self.continue_backtest = True self.hist_end_date = hist_end_date self.hist_duration = hist_duration self.hist_barsize = hist_barsize - for ticker in tickers: + # The position of a ticker in this dict is used as its IB ID. + self.tickers = {} # TODO gross + self.ticker_lookup = {} + + for ticker in param_tickers: # TODO gross param_tickers -- combine above? self._subscribe_ticker(ticker) - import time - time.sleep(10) + self._wait_for_hist_population() def _subscribe_ticker(self, ticker): + """ + Request ticker data from IB + """ if ticker not in self.tickers: # Set up an IB ContractS contract = Contract() contract.exchange = "SMART" contract.symbol = ticker contract.secType = "STK" - contract.currency = "USD" + contract.currency = "AUD" if self.mode == "historic": + ib_ticker_id = len(self.tickers) end_time = datetime.datetime.strftime(self.hist_end_date, "%Y%m%d 17:00:00") - self.ib_client.reqHistoricalData( - 0, contract, end_time, self.hist_duration, self.hist_barsize, + self.ib_service.reqHistoricalData( + ib_ticker_id, contract, end_time, self.hist_duration, self.hist_barsize, "TRADES", True, 2, None) + + # TODO gross + self.ticker_lookup[len(self.tickers)] = ticker + self.tickers[ticker] = {} + + + def _wait_for_hist_population(self): + """ + # TODO this *needs* to be an infinite loop running inside the service, + # with the service launched on a new Thread or Process + """ + while (self.ib_service.conn.isConnected() or not self.ib_service.msg_queue.empty()) and len(self.ib_service.waitingHistoricalData) != 0: + try: + text = self.ib_service.msg_queue.get(block=True, timeout=0.2) + if len(text) > MAX_MSG_LEN: + self.ib_service.wrapper.error(NO_VALID_ID, BAD_LENGTH.code(), + "%s:%d:%s" % (BAD_LENGTH.msg(), len(text), text)) + self.ib_service.disconnect() + break + except queue.Empty: + print("queue.get: empty") + else: + fields = comm.read_fields(text) + print("fields %s", fields) + self.ib_service.decoder.interpret(fields) + + print("conn:%d queue.sz:%d", + self.ib_service.conn.isConnected(), + self.ib_service.msg_queue.qsize()) + + self.ib_service.disconnect() diff --git a/qstrader/service/ib.py b/qstrader/service/ib.py index fa1d3e7f..eb4a5189 100644 --- a/qstrader/service/ib.py +++ b/qstrader/service/ib.py @@ -52,31 +52,6 @@ def __init__(self): self.historicalDataQueue = queue.Queue() self.waitingHistoricalData = [] - # Connect to IB and make the historic data request. - # self.connect("127.0.0.1", 4001, clientId=0) - # self.historicalDataRequests_req() - # - # while (self.conn.isConnected() or not self.msg_queue.empty()) and len(self.waitingHistoricalData) != 0: - # try: - # text = self.msg_queue.get(block=True, timeout=0.2) - # if len(text) > MAX_MSG_LEN: - # self.wrapper.error(NO_VALID_ID, BAD_LENGTH.code(), - # "%s:%d:%s" % (BAD_LENGTH.msg(), len(text), text)) - # self.disconnect() - # break - # except queue.Empty: - # logging.debug("queue.get: empty") - # else: - # fields = comm.read_fields(text) - # logging.debug("fields %s", fields) - # self.decoder.interpret(fields) - # - # # print("conn:%d queue.sz:%d", - # # self.conn.isConnected(), - # # self.msg_queue.qsize()) - # - # self.disconnect() - def error(self, reqId:TickerId, errorCode:int, errorString:str): super().error(reqId, errorCode, errorString) @@ -102,7 +77,9 @@ def reqHistoricalData(self, reqId:TickerId , contract:Contract, endDateTime:str, def historicalData(self, reqId:TickerId , date:str, open:float, high:float, low:float, close:float, volume:int, barCount:int, WAP:float, hasGaps:int): - print("RECEIVED HISTORIC DATA") + print("HistoricalData. ", reqId, " Date:", date, "Open:", open, + "High:", high, "Low:", low, "Close:", close, "Volume:", volume, + "Count:", barCount, "WAP:", WAP, "HasGaps:", hasGaps) self.historicalDataQueue.put((reqId, date, open, high, low, close, volume, barCount, WAP, hasGaps)) diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index 2443345f..700e58aa 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -4,7 +4,6 @@ from qstrader.compat import queue from qstrader import settings - class TestPriceHandlerSimpleCase(unittest.TestCase): def setUp(self): """ @@ -14,7 +13,7 @@ def setUp(self): self.config = settings.TEST fixtures_path = self.config.CSV_DATA_DIR events_queue = queue.Queue() - init_tickers = ["FB"] + init_tickers = ["CBA"] self.price_handler = IBBarPriceHandler( events_queue, init_tickers, self.config ) From ffe293852306498be9cf5dd40223538dcfecb14c Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sun, 26 Feb 2017 12:43:09 +1100 Subject: [PATCH 06/35] Remove print statements --- qstrader/service/ib.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/qstrader/service/ib.py b/qstrader/service/ib.py index eb4a5189..fe6b7742 100644 --- a/qstrader/service/ib.py +++ b/qstrader/service/ib.py @@ -65,7 +65,6 @@ def reqHistoricalData(self, reqId:TickerId , contract:Contract, endDateTime:str, durationStr:str, barSizeSetting:str, whatToShow:str, useRTH:int, formatDate:int, chartOptions:TagValueList): self.waitingHistoricalData.append(reqId) - print("REQUESTING HISTORIC, WAITING FOR %s" % len(self.waitingHistoricalData)) super().reqHistoricalData( reqId, contract, endDateTime, durationStr, barSizeSetting, whatToShow, useRTH, formatDate, chartOptions) @@ -77,9 +76,6 @@ def reqHistoricalData(self, reqId:TickerId , contract:Contract, endDateTime:str, def historicalData(self, reqId:TickerId , date:str, open:float, high:float, low:float, close:float, volume:int, barCount:int, WAP:float, hasGaps:int): - print("HistoricalData. ", reqId, " Date:", date, "Open:", open, - "High:", high, "Low:", low, "Close:", close, "Volume:", volume, - "Count:", barCount, "WAP:", WAP, "HasGaps:", hasGaps) self.historicalDataQueue.put((reqId, date, open, high, low, close, volume, barCount, WAP, hasGaps)) @@ -88,5 +84,4 @@ def historicalData(self, reqId:TickerId , date:str, open:float, high:float, TODO: Will it work with multiple historical requests for same symbol? """ def historicalDataEnd(self, reqId:int, start:str, end:str): - print("FINISHED FOR %s" % reqId) self.waitingHistoricalData.remove(reqId) From deaf6b4cf2d01512e439ad7ac57cd7882f084753 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sun, 26 Feb 2017 12:45:49 +1100 Subject: [PATCH 07/35] IBPriceHandler should now work for at least one historic request --- qstrader/price_handler/ib_bar.py | 76 ++++++++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 3 deletions(-) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index ff3080ed..805680ff 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -1,8 +1,11 @@ import os import datetime import queue +import pandas as pd from .base import AbstractBarPriceHandler +from ..event import BarEvent +from ..price_parser import PriceParser from qstrader.service.ib import IBService from ibapi.client import EClient @@ -40,6 +43,7 @@ class IBBarPriceHandler(AbstractBarPriceHandler): * Historic/Live mode to be set by whether QSTrader is in Backtest or Live mode * IBService should be an initialization parameter * Ports, etc, connection strings from config + * Work with live market data """ def __init__( self, events_queue, param_tickers, settings, mode="historic", @@ -48,11 +52,29 @@ def __init__( ): self.ib_service = IBService() self.ib_service.connect("127.0.0.1",4001,0) + self.barsize_lookup = { + "1 sec": 1, + "5 secs": 5, + "15 secs": 15, + "30 secs": 30, + "1 min": 60, + "2 mins": 120, + "3 mins": 180, + "5 mins": 300, + "15 mins": 900, + "30 mins": 1800, + "1 hour": 3600, + "8 hours": 28800, + "1 day": 86400 + } + self.bar_stream = queue.Queue() + self.events_queue = events_queue self.mode = mode self.continue_backtest = True self.hist_end_date = hist_end_date self.hist_duration = hist_duration - self.hist_barsize = hist_barsize + self.qst_barsize = self.barsize_lookup[hist_barsize] + self.ib_barsize = hist_barsize # The position of a ticker in this dict is used as its IB ID. self.tickers = {} # TODO gross @@ -62,7 +84,9 @@ def __init__( self._subscribe_ticker(ticker) self._wait_for_hist_population() + self._merge_sort_ticker_data() + import pdb; pdb.set_trace() def _subscribe_ticker(self, ticker): """ @@ -80,7 +104,7 @@ def _subscribe_ticker(self, ticker): ib_ticker_id = len(self.tickers) end_time = datetime.datetime.strftime(self.hist_end_date, "%Y%m%d 17:00:00") self.ib_service.reqHistoricalData( - ib_ticker_id, contract, end_time, self.hist_duration, self.hist_barsize, + ib_ticker_id, contract, end_time, self.hist_duration, self.ib_barsize, "TRADES", True, 2, None) # TODO gross @@ -111,5 +135,51 @@ def _wait_for_hist_population(self): print("conn:%d queue.sz:%d", self.ib_service.conn.isConnected(), self.ib_service.msg_queue.qsize()) - self.ib_service.disconnect() + + + def _merge_sort_ticker_data(self): + """ + Collects all the equities data from thte IBService, and populates the + member Queue `self.bar_stream`. This queue is used for the `stream_next()` + function. + """ + historicalData = [] + while not self.ib_service.historicalDataQueue.empty(): + historicalData.append(self.ib_service.historicalDataQueue.get()) + historicalData = sorted(historicalData, key=lambda x: x[1]) + for bar_tuple in historicalData: + self.bar_stream.put(bar_tuple) + + def _create_event(self, mkt_event): + """ + mkt_event is a tuple created according to the format: + http:////www.interactivebrokers.com/en/software/api/apiguide/java/historicaldata.htm + """ + ticker = self.ticker_lookup[mkt_event[0]] + time = datetime.datetime.fromtimestamp(int(mkt_event[1])) + barsize = self.qst_barsize + open_price = PriceParser.parse(mkt_event[2]) + high_price = PriceParser.parse(mkt_event[3]) + low_price = PriceParser.parse(mkt_event[4]) + close_price = PriceParser.parse(mkt_event[5]) + adj_close_price = PriceParser.parse(mkt_event[5]) # TODO redundant? + volume = mkt_event[6] + return BarEvent( + ticker, time, barsize, open_price, high_price, + low_price, close_price, volume, adj_close_price + ) + + + def stream_next(self): + """ + Create the next BarEvent and place it onto the event queue. + """ + mkt_event = self.bar_stream.get() + if self.bar_stream.empty(): + self.continue_backtest = False + else: + # Create, store and return the bar event. + bev = self._create_event(mkt_event); + self._store_event(bev) + self.events_queue.put(bev) From 7bc7f540ac77bbb5dc4de9454b11fdd6ba2c3aa9 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sun, 26 Feb 2017 12:56:22 +1100 Subject: [PATCH 08/35] remove debug statement --- qstrader/price_handler/ib_bar.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index 805680ff..590c2e66 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -86,8 +86,6 @@ def __init__( self._wait_for_hist_population() self._merge_sort_ticker_data() - import pdb; pdb.set_trace() - def _subscribe_ticker(self, ticker): """ Request ticker data from IB From 84a1aa6c3889b6bf6dba5239df7bd346263dec56 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 8 Mar 2017 10:57:28 +1100 Subject: [PATCH 09/35] IBService implements threading interface to run the infinite message handling loop. --- qstrader/price_handler/ib_bar.py | 33 ++++++++++++-------------------- qstrader/service/ib.py | 33 ++++++++++++++++++++++++++++++-- 2 files changed, 43 insertions(+), 23 deletions(-) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index 590c2e66..f49b7416 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -52,6 +52,10 @@ def __init__( ): self.ib_service = IBService() self.ib_service.connect("127.0.0.1",4001,0) + + # TODO move outside this class, call when backtest or live trading is finished + self.ib_service.start() + self.barsize_lookup = { "1 sec": 1, "5 secs": 5, @@ -112,28 +116,15 @@ def _subscribe_ticker(self, ticker): def _wait_for_hist_population(self): """ - # TODO this *needs* to be an infinite loop running inside the service, - # with the service launched on a new Thread or Process + Blocks until the historical dataset has been populated. """ - while (self.ib_service.conn.isConnected() or not self.ib_service.msg_queue.empty()) and len(self.ib_service.waitingHistoricalData) != 0: - try: - text = self.ib_service.msg_queue.get(block=True, timeout=0.2) - if len(text) > MAX_MSG_LEN: - self.ib_service.wrapper.error(NO_VALID_ID, BAD_LENGTH.code(), - "%s:%d:%s" % (BAD_LENGTH.msg(), len(text), text)) - self.ib_service.disconnect() - break - except queue.Empty: - print("queue.get: empty") - else: - fields = comm.read_fields(text) - print("fields %s", fields) - self.ib_service.decoder.interpret(fields) - - print("conn:%d queue.sz:%d", - self.ib_service.conn.isConnected(), - self.ib_service.msg_queue.qsize()) - self.ib_service.disconnect() + while len(self.ib_service.waitingHistoricalData) != 0: + pass + + # TODO move outside this class, call when backtest or live trading is finished + self.ib_service.stop_event.set() + self.ib_service.join() + def _merge_sort_ticker_data(self): diff --git a/qstrader/service/ib.py b/qstrader/service/ib.py index fe6b7742..83e585b2 100644 --- a/qstrader/service/ib.py +++ b/qstrader/service/ib.py @@ -4,7 +4,7 @@ import collections import inspect import queue - +import threading import logging import time import os.path @@ -29,7 +29,7 @@ from ibapi.account_summary_tags import * -class IBService(EWrapper, EClient): +class IBService(EWrapper, EClient, threading.Thread): """ The IBService is the primary conduit of data from QStrader to Interactive Brokers. This service provides functions to request data, and allows for @@ -44,10 +44,14 @@ class IBService(EWrapper, EClient): methods offered in this class. This ensures that the logic required to talk with IB is contained within this class exclusively, with the added benefit that we can easily create mock instances of the IBService for testing. + + TODO: document usage (starting, stopping, using) """ def __init__(self): EWrapper.__init__(self) EClient.__init__(self, wrapper=self) + threading.Thread.__init__(self, name='IBService') + self.stop_event = threading.Event() self.historicalDataQueue = queue.Queue() self.waitingHistoricalData = [] @@ -85,3 +89,28 @@ def historicalData(self, reqId:TickerId , date:str, open:float, high:float, """ def historicalDataEnd(self, reqId:int, start:str, end:str): self.waitingHistoricalData.remove(reqId) + + + """ + TODO document + """ + def run(self): + while (self.conn.isConnected() or not self.msg_queue.empty()) and not self.stop_event.is_set() : + try: + text = self.msg_queue.get(block=True, timeout=0.2) + if len(text) > MAX_MSG_LEN: + self.wrapper.error(NO_VALID_ID, BAD_LENGTH.code(), + "%s:%d:%s" % (BAD_LENGTH.msg(), len(text), text)) + self.disconnect() + break + except queue.Empty: + print("queue.get: empty") + else: + fields = comm.read_fields(text) + print("fields %s", fields) + self.decoder.interpret(fields) + + print("conn:%d queue.sz:%d", + self.conn.isConnected(), + self.msg_queue.qsize()) + self.disconnect() From 9519bc03dd31eb43af855102d27df320237ec52e Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 8 Mar 2017 11:09:10 +1100 Subject: [PATCH 10/35] Moved IBService creation, setup and stop outside of the price handler --- qstrader/price_handler/ib_bar.py | 14 ++------------ qstrader/service/ib.py | 2 +- tests/test_ib_price_handler.py | 16 +++++++++++++++- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index f49b7416..8d7cefa4 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -46,16 +46,11 @@ class IBBarPriceHandler(AbstractBarPriceHandler): * Work with live market data """ def __init__( - self, events_queue, param_tickers, settings, mode="historic", + self, ib_service, events_queue, param_tickers, settings, mode="historic", hist_end_date = datetime.datetime.now() - datetime.timedelta(days=3), hist_duration="5 D", hist_barsize="1 min" ): - self.ib_service = IBService() - self.ib_service.connect("127.0.0.1",4001,0) - - # TODO move outside this class, call when backtest or live trading is finished - self.ib_service.start() - + self.ib_service = ib_service self.barsize_lookup = { "1 sec": 1, "5 secs": 5, @@ -121,11 +116,6 @@ def _wait_for_hist_population(self): while len(self.ib_service.waitingHistoricalData) != 0: pass - # TODO move outside this class, call when backtest or live trading is finished - self.ib_service.stop_event.set() - self.ib_service.join() - - def _merge_sort_ticker_data(self): """ diff --git a/qstrader/service/ib.py b/qstrader/service/ib.py index 83e585b2..ad60cec8 100644 --- a/qstrader/service/ib.py +++ b/qstrader/service/ib.py @@ -95,7 +95,7 @@ def historicalDataEnd(self, reqId:int, start:str, end:str): TODO document """ def run(self): - while (self.conn.isConnected() or not self.msg_queue.empty()) and not self.stop_event.is_set() : + while (self.conn.isConnected() or not self.msg_queue.empty()) and not self.stop_event.is_set(): try: text = self.msg_queue.get(block=True, timeout=0.2) if len(text) > MAX_MSG_LEN: diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index 700e58aa..d8936909 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -2,6 +2,7 @@ from qstrader.price_handler.ib_bar import IBBarPriceHandler from qstrader.compat import queue +from qstrader.service.ib import IBService from qstrader import settings class TestPriceHandlerSimpleCase(unittest.TestCase): @@ -9,15 +10,28 @@ def setUp(self): """ Set up the PriceHandler object with a small set of initial tickers for a backtest in historic mode. + + For now, while testing locally with IB (i.e. not Travis), implement + all real IB functionality (i.e. set up service) """ + self.ib_service = IBService() + self.ib_service.connect("127.0.0.1",4001,0) # TODO remove & replace with mock when happy + self.ib_service.start() + + self.config = settings.TEST fixtures_path = self.config.CSV_DATA_DIR events_queue = queue.Queue() init_tickers = ["CBA"] self.price_handler = IBBarPriceHandler( - events_queue, init_tickers, self.config + self.ib_service, events_queue, init_tickers, self.config ) + def tearDown(self): + self.ib_service.stop_event.set() + self.ib_service.join() + + def test(self): self.assertEqual(1,2) From 749e13c04040188c289f50558352ce09e7d848cc Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 8 Mar 2017 11:33:49 +1100 Subject: [PATCH 11/35] Ensure tradelog is created if it does not exist --- qstrader/compliance/example.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qstrader/compliance/example.py b/qstrader/compliance/example.py index e66738fb..f4af9a32 100644 --- a/qstrader/compliance/example.py +++ b/qstrader/compliance/example.py @@ -40,7 +40,7 @@ def __init__(self, config): "commission" ] fname = os.path.expanduser(os.path.join(self.config.OUTPUT_DIR, self.csv_filename)) - with open(fname, 'a') as csvfile: + with open(fname, 'a+') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() @@ -49,7 +49,7 @@ def record_trade(self, fill): Append all details about the FillEvent to the CSV trade log. """ fname = os.path.expanduser(os.path.join(self.config.OUTPUT_DIR, self.csv_filename)) - with open(fname, 'a') as csvfile: + with open(fname, 'a+') as csvfile: writer = csv.writer(csvfile) writer.writerow([ fill.timestamp, fill.ticker, From fec381556b81b1f3efb6e1022b97df4df34878e3 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 8 Mar 2017 11:47:28 +1100 Subject: [PATCH 12/35] Fix some strange kind of systemerror that appeared using Python 3.5 --- qstrader/statistics/performance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qstrader/statistics/performance.py b/qstrader/statistics/performance.py index 771da596..da081bb0 100644 --- a/qstrader/statistics/performance.py +++ b/qstrader/statistics/performance.py @@ -63,7 +63,7 @@ def create_sortino_ratio(returns, periods=252): returns - A pandas Series representing period percentage returns. periods - Daily (252), Hourly (252*6.5), Minutely(252*6.5*60) etc. """ - return np.sqrt(periods) * (np.mean(returns)) / np.std(returns[returns < 0]) + return np.sqrt(periods) * (np.mean(returns)) / np.std(returns.ix[returns < 0]) def create_drawdowns(returns): From 2a2fb4bbf31690445617901a0bc1d9213a05255f Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 8 Mar 2017 11:47:58 +1100 Subject: [PATCH 13/35] Add a few notes --- qstrader/price_handler/ib_bar.py | 2 +- qstrader/service/ib.py | 8 ++------ tests/test_ib_price_handler.py | 1 + 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index 8d7cefa4..694ffa7e 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -95,7 +95,7 @@ def _subscribe_ticker(self, ticker): contract.exchange = "SMART" contract.symbol = ticker contract.secType = "STK" - contract.currency = "AUD" + contract.currency = "AUD" # TODO -- Should PriceHandler take in a list of contracts? if self.mode == "historic": ib_ticker_id = len(self.tickers) diff --git a/qstrader/service/ib.py b/qstrader/service/ib.py index ad60cec8..22d2e508 100644 --- a/qstrader/service/ib.py +++ b/qstrader/service/ib.py @@ -92,7 +92,7 @@ def historicalDataEnd(self, reqId:int, start:str, end:str): """ - TODO document + TODO document usage """ def run(self): while (self.conn.isConnected() or not self.msg_queue.empty()) and not self.stop_event.is_set(): @@ -104,13 +104,9 @@ def run(self): self.disconnect() break except queue.Empty: - print("queue.get: empty") + pass # TODO something more appropriate else: fields = comm.read_fields(text) - print("fields %s", fields) self.decoder.interpret(fields) - print("conn:%d queue.sz:%d", - self.conn.isConnected(), - self.msg_queue.qsize()) self.disconnect() diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index d8936909..0fa3cac0 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -26,6 +26,7 @@ def setUp(self): self.price_handler = IBBarPriceHandler( self.ib_service, events_queue, init_tickers, self.config ) + def tearDown(self): self.ib_service.stop_event.set() From 657b016ba0c3e2b827a5e13e402282dec49ab401 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 8 Mar 2017 11:50:44 +1100 Subject: [PATCH 14/35] Add a fwe notes --- qstrader/price_handler/ib_bar.py | 3 +-- tests/test_ib_price_handler.py | 12 +++++++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index 694ffa7e..6a03b09f 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -41,9 +41,8 @@ class IBBarPriceHandler(AbstractBarPriceHandler): TODO: * Historic/Live mode to be set by whether QSTrader is in Backtest or Live mode - * IBService should be an initialization parameter - * Ports, etc, connection strings from config * Work with live market data + * Ensure works with multiple tickers """ def __init__( self, ib_service, events_queue, param_tickers, settings, mode="historic", diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index 0fa3cac0..2fe1c525 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -13,6 +13,16 @@ def setUp(self): For now, while testing locally with IB (i.e. not Travis), implement all real IB functionality (i.e. set up service) + + + + TODO: + * Mock IB Service, populate dummy data. + * Duplicate tests from test_price_handler + * Add test to all price_handlers for get_last_close() + * Test multiple tickers + * Test multiple timeframes + * Test returned date formats """ self.ib_service = IBService() self.ib_service.connect("127.0.0.1",4001,0) # TODO remove & replace with mock when happy @@ -26,7 +36,7 @@ def setUp(self): self.price_handler = IBBarPriceHandler( self.ib_service, events_queue, init_tickers, self.config ) - + def tearDown(self): self.ib_service.stop_event.set() From c806a64432c979cb8ad27b82501bc1a357f22b7b Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Thu, 9 Mar 2017 08:42:00 +1100 Subject: [PATCH 15/35] Add some instructions for usage to the IBService --- qstrader/service/ib.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/qstrader/service/ib.py b/qstrader/service/ib.py index 22d2e508..73d5d80d 100644 --- a/qstrader/service/ib.py +++ b/qstrader/service/ib.py @@ -45,14 +45,21 @@ class IBService(EWrapper, EClient, threading.Thread): is contained within this class exclusively, with the added benefit that we can easily create mock instances of the IBService for testing. - TODO: document usage (starting, stopping, using) + Several calls must be made to the IBService in order for it to run correctly. + These should be called from the user's main `trading-session` script. + + An IBService object must be instantiated, immediately followed by the .connect() + call, immediately followed by the .start() call, which spawns a thread for the run loop. + When the trading session is complete, the service should be stopped gracefully by + calling ibservice.stop_event.set() to break the infinite loop, and ibservice.join() + to wait for the thread to close. """ def __init__(self): EWrapper.__init__(self) EClient.__init__(self, wrapper=self) threading.Thread.__init__(self, name='IBService') self.stop_event = threading.Event() - + # Set up data queues. self.historicalDataQueue = queue.Queue() self.waitingHistoricalData = [] @@ -92,7 +99,9 @@ def historicalDataEnd(self, reqId:int, start:str, end:str): """ - TODO document usage + Overridden from the Threading class. Infinite loop which handles + message passing from IB to QSTrader. This loop is run in new thread when + started. """ def run(self): while (self.conn.isConnected() or not self.msg_queue.empty()) and not self.stop_event.is_set(): From 960cf79ee39d0bd4a8d59e985faf35d4bd63aab9 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Thu, 9 Mar 2017 08:51:05 +1100 Subject: [PATCH 16/35] Added an example IB Historic backtest --- examples/buy_and_hold_historic_ib.py | 84 ++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 examples/buy_and_hold_historic_ib.py diff --git a/examples/buy_and_hold_historic_ib.py b/examples/buy_and_hold_historic_ib.py new file mode 100644 index 00000000..2958350e --- /dev/null +++ b/examples/buy_and_hold_historic_ib.py @@ -0,0 +1,84 @@ +import datetime + +from qstrader import settings +from qstrader.strategy.base import AbstractStrategy +from qstrader.event import SignalEvent, EventType +from qstrader.compat import queue +from qstrader.trading_session.backtest import Backtest +from qstrader.service.ib import IBService +from qstrader.price_handler.ib_bar import IBBarPriceHandler + +class BuyAndHoldStrategy(AbstractStrategy): + """ + A testing strategy that simply purchases (longs) any asset that + matches what was passed in on initialization and + then holds until the completion of a backtest. + """ + def __init__( + self, tickers, events_queue, + base_quantity=100 + ): + self.tickers = tickers + self.invested = dict.fromkeys(tickers) + self.events_queue = events_queue + self.base_quantity = base_quantity + + def calculate_signals(self, event): + if ( + event.type in [EventType.BAR, EventType.TICK] and + event.ticker in self.tickers + ): + if not self.invested[event.ticker]: + signal = SignalEvent( + event.ticker, "BOT", + suggested_quantity=self.base_quantity + ) + self.events_queue.put(signal) + self.invested[event.ticker] = True + + +def run(config, testing, tickers, filename): + # Backtest information + title = ['Buy and Hold Example on %s' % tickers[0]] + initial_equity = 10000.0 + start_date = datetime.datetime(2000, 1, 1) + end_date = datetime.datetime(2014, 1, 1) + events_queue = queue.Queue() + + # Set up IBService + ib_service = IBService() + ib_service.connect("127.0.0.1", 4001, 0) # TODO from config + ib_service.start() + + # Set up the IB PriceHandler + price_handler = IBBarPriceHandler( + ib_service, events_queue, tickers, config + ) + + # Use the Buy and Hold Strategy + strategy = BuyAndHoldStrategy(tickers, events_queue) + + # Set up the backtest + backtest = Backtest( + config, strategy, tickers, + initial_equity, start_date, end_date, + events_queue, price_handler, title=title + ) + results = backtest.simulate_trading(testing=testing) + + # Disconnect from services + ib_service.stop_event.set() + ib_service.join() + + return results + + +if __name__ == "__main__": + # Configuration data + testing = False + config = settings.from_file( + settings.DEFAULT_CONFIG_FILENAME, testing + ) + tickers = ["CBA", "BHP", "STO", "FMG", "WOW", "WES"] + filename = None + run(config, testing, tickers, filename) From e38a59a31dae130aa32909030af41eb5fd47bc28 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Thu, 9 Mar 2017 09:26:02 +1100 Subject: [PATCH 17/35] The IBPriceHandler must take in a list of symbols for subscription, rather than guessing how to create IB Contracts based on symbols+user's config --- examples/buy_and_hold_historic_ib.py | 28 +++++++++++++---- qstrader/price_handler/ib_bar.py | 45 +++++++++++++--------------- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/examples/buy_and_hold_historic_ib.py b/examples/buy_and_hold_historic_ib.py index 2958350e..45b234b6 100644 --- a/examples/buy_and_hold_historic_ib.py +++ b/examples/buy_and_hold_historic_ib.py @@ -7,6 +7,7 @@ from qstrader.trading_session.backtest import Backtest from qstrader.service.ib import IBService from qstrader.price_handler.ib_bar import IBBarPriceHandler +from ibapi.contract import * class BuyAndHoldStrategy(AbstractStrategy): """ @@ -39,10 +40,8 @@ def calculate_signals(self, event): def run(config, testing, tickers, filename): # Backtest information - title = ['Buy and Hold Example on %s' % tickers[0]] + title = ['Buy and Hold Historic IB Example'] initial_equity = 10000.0 - start_date = datetime.datetime(2000, 1, 1) - end_date = datetime.datetime(2014, 1, 1) events_queue = queue.Queue() # Set up IBService @@ -50,14 +49,33 @@ def run(config, testing, tickers, filename): ib_service.connect("127.0.0.1", 4001, 0) # TODO from config ib_service.start() - # Set up the IB PriceHandler + # Set up IB Contract objects for the PriceHandler + # MORE INFO: https://www.interactivebrokers.com/en/?f=%2Fen%2Fgeneral%2Fcontact%2FtipsContractsDatabaseSearch.php%3Fib_entity%3Dllc + symbols = ["CBA", "BHP", "STO", "FMG", "WOW", "WES"] + contracts = [] + for symbol in symbols: + contract = Contract() + contract.exchange = "SMART" + contract.symbol = symbol + contract.secType = "STK" + contract.currency = "AUD" + contracts.append(contract) + + # Set up the IB PriceHandler. Want 5 day's of minute bars, up to yesterday. + # Look at IB Documentation for possible values. + end_date = datetime.datetime.now() - datetime.timedelta(days=1) price_handler = IBBarPriceHandler( - ib_service, events_queue, tickers, config + ib_service, events_queue, contracts, config, + "historic", end_date, hist_duration="5 D", hist_barsize="1 min" ) # Use the Buy and Hold Strategy strategy = BuyAndHoldStrategy(tickers, events_queue) + # Start/End TODO redundant -- only required for default (Yahoo) price handler + start_date = datetime.datetime(2000, 1, 1) + end_date = datetime.datetime(2014, 1, 1) + # Set up the backtest backtest = Backtest( config, strategy, tickers, diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index 6a03b09f..69077f88 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -39,13 +39,14 @@ class IBBarPriceHandler(AbstractBarPriceHandler): Uses the IBService to make requests and collect data once responses have returned. + `param_contracts` must be a list of IB Contract objects. + TODO: * Historic/Live mode to be set by whether QSTrader is in Backtest or Live mode * Work with live market data - * Ensure works with multiple tickers """ def __init__( - self, ib_service, events_queue, param_tickers, settings, mode="historic", + self, ib_service, events_queue, param_contracts, settings, mode="historic", hist_end_date = datetime.datetime.now() - datetime.timedelta(days=3), hist_duration="5 D", hist_barsize="1 min" ): @@ -65,6 +66,7 @@ def __init__( "8 hours": 28800, "1 day": 86400 } + self.tickers = {} # Required to be populated for some parent methods. self.bar_stream = queue.Queue() self.events_queue = events_queue self.mode = mode @@ -74,38 +76,33 @@ def __init__( self.qst_barsize = self.barsize_lookup[hist_barsize] self.ib_barsize = hist_barsize - # The position of a ticker in this dict is used as its IB ID. - self.tickers = {} # TODO gross - self.ticker_lookup = {} + # The position of a contract in this dict is used as its IB ID. + self.contracts = {} # TODO gross + self.contract_lookup = {} - for ticker in param_tickers: # TODO gross param_tickers -- combine above? - self._subscribe_ticker(ticker) + for contract in param_contracts: # TODO gross param_contracts -- combine above? + self._subscribe_contract(contract) self._wait_for_hist_population() - self._merge_sort_ticker_data() + self._merge_sort_contract_data() - def _subscribe_ticker(self, ticker): + def _subscribe_contract(self, contract): """ - Request ticker data from IB + Request contract data from IB """ - if ticker not in self.tickers: - # Set up an IB ContractS - contract = Contract() - contract.exchange = "SMART" - contract.symbol = ticker - contract.secType = "STK" - contract.currency = "AUD" # TODO -- Should PriceHandler take in a list of contracts? + # Add ticker symbol, as required by some parent methods + self.tickers[contract.symbol] = {} if self.mode == "historic": - ib_ticker_id = len(self.tickers) + ib_contract_id = len(self.contracts) end_time = datetime.datetime.strftime(self.hist_end_date, "%Y%m%d 17:00:00") self.ib_service.reqHistoricalData( - ib_ticker_id, contract, end_time, self.hist_duration, self.ib_barsize, + ib_contract_id, contract, end_time, self.hist_duration, self.ib_barsize, "TRADES", True, 2, None) # TODO gross - self.ticker_lookup[len(self.tickers)] = ticker - self.tickers[ticker] = {} + self.contract_lookup[len(self.contracts)] = contract.symbol + self.contracts[contract] = {} def _wait_for_hist_population(self): @@ -116,7 +113,7 @@ def _wait_for_hist_population(self): pass - def _merge_sort_ticker_data(self): + def _merge_sort_contract_data(self): """ Collects all the equities data from thte IBService, and populates the member Queue `self.bar_stream`. This queue is used for the `stream_next()` @@ -134,7 +131,7 @@ def _create_event(self, mkt_event): mkt_event is a tuple created according to the format: http:////www.interactivebrokers.com/en/software/api/apiguide/java/historicaldata.htm """ - ticker = self.ticker_lookup[mkt_event[0]] + symbol = self.contract_lookup[mkt_event[0]] time = datetime.datetime.fromtimestamp(int(mkt_event[1])) barsize = self.qst_barsize open_price = PriceParser.parse(mkt_event[2]) @@ -144,7 +141,7 @@ def _create_event(self, mkt_event): adj_close_price = PriceParser.parse(mkt_event[5]) # TODO redundant? volume = mkt_event[6] return BarEvent( - ticker, time, barsize, open_price, high_price, + symbol, time, barsize, open_price, high_price, low_price, close_price, volume, adj_close_price ) From b6417f26c56c2caf0d9368214ce0054adfb1a063 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 11 Mar 2017 17:44:26 +1100 Subject: [PATCH 18/35] Started adding more test cases. Mocking IBService. --- tests/test_ib_price_handler.py | 47 ++++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index 2fe1c525..5d7d217b 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -1,9 +1,11 @@ import unittest +from unittest.mock import MagicMock from qstrader.price_handler.ib_bar import IBBarPriceHandler from qstrader.compat import queue from qstrader.service.ib import IBService from qstrader import settings +from ibapi.contract import * class TestPriceHandlerSimpleCase(unittest.TestCase): def setUp(self): @@ -14,8 +16,6 @@ def setUp(self): For now, while testing locally with IB (i.e. not Travis), implement all real IB functionality (i.e. set up service) - - TODO: * Mock IB Service, populate dummy data. * Duplicate tests from test_price_handler @@ -23,28 +23,47 @@ def setUp(self): * Test multiple tickers * Test multiple timeframes * Test returned date formats + * Test mocked live market data methods """ - self.ib_service = IBService() - self.ib_service.connect("127.0.0.1",4001,0) # TODO remove & replace with mock when happy - self.ib_service.start() - - + self.ib_service = MagicMock() self.config = settings.TEST fixtures_path = self.config.CSV_DATA_DIR events_queue = queue.Queue() - init_tickers = ["CBA"] + + # Set up an IB Contract/ + contract = Contract() + contract.exchange = "SMART" + contract.symbol = "CBA" + contract.secType = "STK" + contract.currency = "AUD" + + # Create the price handler. self.price_handler = IBBarPriceHandler( - self.ib_service, events_queue, init_tickers, self.config + self.ib_service, events_queue, [contract], self.config ) + def test_stream_all_historic_events(self): + """ + Will test that: + * historic data is collected from IBService + * historic data is merge sorted correctly + * historic data is streamed out correctly + """ + self.assertEqual(1, 2) + + def test_made_historical_requests(self): + self.assertEqual(1, 2) + + def test_can_handle_all_bar_sizes(self): + self.assertEqual(1, 2) + + def test_can_do_reqid_to_ticker_lookup(self): + self.assertEqual(1, 2) - def tearDown(self): - self.ib_service.stop_event.set() - self.ib_service.join() + def test_get_best_bid_ask(self): + self.assertEqual(1, 2) - def test(self): - self.assertEqual(1,2) From 39734f0431bc89ab88af87198fd96af658878ccb Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 11 Mar 2017 18:47:32 +1100 Subject: [PATCH 19/35] Fix bug in IB Price Handler's timestamps, create a whole mock object rather than trying Python mocking. --- qstrader/price_handler/ib_bar.py | 3 +- tests/test_ib_price_handler.py | 83 +++++++++++++++++++++++++++----- 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index 69077f88..6e05663a 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -126,13 +126,14 @@ def _merge_sort_contract_data(self): for bar_tuple in historicalData: self.bar_stream.put(bar_tuple) + def _create_event(self, mkt_event): """ mkt_event is a tuple created according to the format: http:////www.interactivebrokers.com/en/software/api/apiguide/java/historicaldata.htm """ symbol = self.contract_lookup[mkt_event[0]] - time = datetime.datetime.fromtimestamp(int(mkt_event[1])) + time = pd.Timestamp(int(mkt_event[1]) * 1e9) barsize = self.qst_barsize open_price = PriceParser.parse(mkt_event[2]) high_price = PriceParser.parse(mkt_event[3]) diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index 5d7d217b..7e258235 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -1,12 +1,42 @@ import unittest -from unittest.mock import MagicMock - +import mock +import queue +import numpy as np +from qstrader.price_parser import PriceParser from qstrader.price_handler.ib_bar import IBBarPriceHandler from qstrader.compat import queue from qstrader.service.ib import IBService from qstrader import settings from ibapi.contract import * +closes = np.arange(80.00, 91.00, 1) + +class IBServiceMock(object): + def __init__(self): + self.historicalDataQueue = queue.Queue() + self.waitingHistoricalData = [] + self.countHistoricalRequestsMade = 0 + # Populate some historic data for the mock service. + # Starting at 2017-01-01 13:00:00, 1 minute bars. + timestamp = 1483275600 + # CBA mock data + for i in range(0,10): + self.historicalDataQueue.put((0, timestamp + (i * 60), + closes[i], closes[i]+1, + closes[i]-1, closes[i+1], + 1000000, 100, closes[i], False)) + # BHP mock data + for i in range(0,10): + self.historicalDataQueue.put((1, timestamp + (i * 60), + closes[i]/2, closes[i]+1/2, + (closes[i]-1)/2, closes[i+1]/2, + 1000000, 100, closes[i]/2, False)) + + def reqHistoricalData(*arg): + self.countHistoricalRequestsMade += 1 + + + class TestPriceHandlerSimpleCase(unittest.TestCase): def setUp(self): """ @@ -25,23 +55,30 @@ def setUp(self): * Test returned date formats * Test mocked live market data methods """ - self.ib_service = MagicMock() + self.ib_service = IBServiceMock() self.config = settings.TEST fixtures_path = self.config.CSV_DATA_DIR events_queue = queue.Queue() - # Set up an IB Contract/ - contract = Contract() - contract.exchange = "SMART" - contract.symbol = "CBA" - contract.secType = "STK" - contract.currency = "AUD" + # Set up an IB Contract for CBA and BHP + cba = Contract() + cba.exchange = "SMART" + cba.symbol = "CBA" + cba.secType = "STK" + cba.currency = "AUD" + + bhp = Contract() + bhp.exchange = "SMART" + bhp.symbol = "BHP" + bhp.secType = "STK" + bhp.currency = "AUD" # Create the price handler. self.price_handler = IBBarPriceHandler( - self.ib_service, events_queue, [contract], self.config + self.ib_service, events_queue, [cba,bhp], self.config ) + def test_stream_all_historic_events(self): """ Will test that: @@ -49,7 +86,31 @@ def test_stream_all_historic_events(self): * historic data is merge sorted correctly * historic data is streamed out correctly """ - self.assertEqual(1, 2) + # Test Bar #1 + self.price_handler.stream_next() + self.assertEqual( + self.price_handler.tickers["CBA"]["timestamp"].strftime( + "%Y-%m-%d %H:%M:%S" + ), + "2017-01-01 13:00:00" + ) + self.assertEqual( + PriceParser.display(self.price_handler.tickers["CBA"]["close"]), + closes[1] # Close is next open + ) + + # Test Bar #2 + self.price_handler.stream_next() + self.assertEqual( + self.price_handler.tickers["BHP"]["timestamp"].strftime( + "%Y-%m-%d %H:%M:%S" + ), + "2017-01-01 13:00:00" + ) + self.assertEqual( + PriceParser.display(self.price_handler.tickers["BHP"]["close"]), + closes[1]/2 # Close is next open + ) def test_made_historical_requests(self): self.assertEqual(1, 2) From 91165fa307c35493d3bf5bb9ca377d18035a200e Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 11 Mar 2017 18:49:04 +1100 Subject: [PATCH 20/35] Ensure all historical requests are made --- tests/test_ib_price_handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index 7e258235..b50734cd 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -32,7 +32,7 @@ def __init__(self): (closes[i]-1)/2, closes[i+1]/2, 1000000, 100, closes[i]/2, False)) - def reqHistoricalData(*arg): + def reqHistoricalData(self, *arg): self.countHistoricalRequestsMade += 1 @@ -113,7 +113,7 @@ def test_stream_all_historic_events(self): ) def test_made_historical_requests(self): - self.assertEqual(1, 2) + self.assertEqual(self.ib_service.countHistoricalRequestsMade, 2) def test_can_handle_all_bar_sizes(self): self.assertEqual(1, 2) From 5a27a3af6e5a964b58ab7d280e43ebc31873c833 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 11 Mar 2017 19:14:48 +1100 Subject: [PATCH 21/35] More intensive testing on streaming historic prices; fixed bug where we'd exit backtest iteration one event too early --- qstrader/price_handler/ib_bar.py | 10 +++--- tests/test_ib_price_handler.py | 57 +++++++++++++++++--------------- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index 6e05663a..0b2a1907 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -133,7 +133,7 @@ def _create_event(self, mkt_event): http:////www.interactivebrokers.com/en/software/api/apiguide/java/historicaldata.htm """ symbol = self.contract_lookup[mkt_event[0]] - time = pd.Timestamp(int(mkt_event[1]) * 1e9) + time = pd.Timestamp(int(mkt_event[1]) * 10**9) barsize = self.qst_barsize open_price = PriceParser.parse(mkt_event[2]) high_price = PriceParser.parse(mkt_event[3]) @@ -151,11 +151,11 @@ def stream_next(self): """ Create the next BarEvent and place it onto the event queue. """ - mkt_event = self.bar_stream.get() - if self.bar_stream.empty(): - self.continue_backtest = False - else: + try: # Create, store and return the bar event. + mkt_event = self.bar_stream.get(False) bev = self._create_event(mkt_event); self._store_event(bev) self.events_queue.put(bev) + except Queue.Empty: + self.continue_backtest = False diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index b50734cd..6f197dc5 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -2,6 +2,7 @@ import mock import queue import numpy as np +import pandas as pd from qstrader.price_parser import PriceParser from qstrader.price_handler.ib_bar import IBBarPriceHandler from qstrader.compat import queue @@ -9,16 +10,18 @@ from qstrader import settings from ibapi.contract import * + +# Starting at 2017-01-01 13:00:00, 1 minute bars. +timestamp = 1483275600 closes = np.arange(80.00, 91.00, 1) + class IBServiceMock(object): def __init__(self): self.historicalDataQueue = queue.Queue() self.waitingHistoricalData = [] self.countHistoricalRequestsMade = 0 # Populate some historic data for the mock service. - # Starting at 2017-01-01 13:00:00, 1 minute bars. - timestamp = 1483275600 # CBA mock data for i in range(0,10): self.historicalDataQueue.put((0, timestamp + (i * 60), @@ -86,31 +89,31 @@ def test_stream_all_historic_events(self): * historic data is merge sorted correctly * historic data is streamed out correctly """ - # Test Bar #1 - self.price_handler.stream_next() - self.assertEqual( - self.price_handler.tickers["CBA"]["timestamp"].strftime( - "%Y-%m-%d %H:%M:%S" - ), - "2017-01-01 13:00:00" - ) - self.assertEqual( - PriceParser.display(self.price_handler.tickers["CBA"]["close"]), - closes[1] # Close is next open - ) - - # Test Bar #2 - self.price_handler.stream_next() - self.assertEqual( - self.price_handler.tickers["BHP"]["timestamp"].strftime( - "%Y-%m-%d %H:%M:%S" - ), - "2017-01-01 13:00:00" - ) - self.assertEqual( - PriceParser.display(self.price_handler.tickers["BHP"]["close"]), - closes[1]/2 # Close is next open - ) + for i in range(0, 10): + # Test CBA + try: + self.price_handler.stream_next() + self.assertEqual( + self.price_handler.tickers["CBA"]["timestamp"], + pd.Timestamp((timestamp + (i*60)) * 1e9) + ) + self.assertEqual( + PriceParser.display(self.price_handler.tickers["CBA"]["close"]), + closes[i+1] # Close is next open + ) + + # Test BHP + self.price_handler.stream_next() + self.assertEqual( + self.price_handler.tickers["BHP"]["timestamp"], + pd.Timestamp((timestamp + (i*60)) * 1e9) + ) + self.assertEqual( + PriceParser.display(self.price_handler.tickers["BHP"]["close"]), + closes[i+1]/2 # Close is next open + ) + except AssertionError: + import pdb; pdb.set_trace() def test_made_historical_requests(self): self.assertEqual(self.ib_service.countHistoricalRequestsMade, 2) From 922ffd05625e046731b220789a4768089b98ee6e Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 11 Mar 2017 19:15:34 +1100 Subject: [PATCH 22/35] Remove debugging --- tests/test_ib_price_handler.py | 43 ++++++++++++++++------------------ 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index 6f197dc5..8a8b5c14 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -91,29 +91,26 @@ def test_stream_all_historic_events(self): """ for i in range(0, 10): # Test CBA - try: - self.price_handler.stream_next() - self.assertEqual( - self.price_handler.tickers["CBA"]["timestamp"], - pd.Timestamp((timestamp + (i*60)) * 1e9) - ) - self.assertEqual( - PriceParser.display(self.price_handler.tickers["CBA"]["close"]), - closes[i+1] # Close is next open - ) - - # Test BHP - self.price_handler.stream_next() - self.assertEqual( - self.price_handler.tickers["BHP"]["timestamp"], - pd.Timestamp((timestamp + (i*60)) * 1e9) - ) - self.assertEqual( - PriceParser.display(self.price_handler.tickers["BHP"]["close"]), - closes[i+1]/2 # Close is next open - ) - except AssertionError: - import pdb; pdb.set_trace() + self.price_handler.stream_next() + self.assertEqual( + self.price_handler.tickers["CBA"]["timestamp"], + pd.Timestamp((timestamp + (i*60)) * 1e9) + ) + self.assertEqual( + PriceParser.display(self.price_handler.tickers["CBA"]["close"]), + closes[i+1] # Close is next open + ) + + # Test BHP + self.price_handler.stream_next() + self.assertEqual( + self.price_handler.tickers["BHP"]["timestamp"], + pd.Timestamp((timestamp + (i*60)) * 1e9) + ) + self.assertEqual( + PriceParser.display(self.price_handler.tickers["BHP"]["close"]), + closes[i+1]/2 # Close is next open + ) def test_made_historical_requests(self): self.assertEqual(self.ib_service.countHistoricalRequestsMade, 2) From 2a6b36a992ffe54ade3012cfc7bde4a160f2297a Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 11 Mar 2017 19:16:51 +1100 Subject: [PATCH 23/35] Remote two test cases that were covered by another test --- tests/test_ib_price_handler.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index 8a8b5c14..761bf788 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -118,12 +118,6 @@ def test_made_historical_requests(self): def test_can_handle_all_bar_sizes(self): self.assertEqual(1, 2) - def test_can_do_reqid_to_ticker_lookup(self): - self.assertEqual(1, 2) - - def test_get_best_bid_ask(self): - self.assertEqual(1, 2) - From 6d9c114549df7ed44e5a895269ea48055bd6bda8 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 11 Mar 2017 19:21:02 +1100 Subject: [PATCH 24/35] Move some test cases to TODO's because I'm not yet sure the best way to handle them. --- qstrader/price_handler/ib_bar.py | 3 +++ tests/test_ib_price_handler.py | 14 -------------- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index 0b2a1907..e5a1653b 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -44,6 +44,9 @@ class IBBarPriceHandler(AbstractBarPriceHandler): TODO: * Historic/Live mode to be set by whether QSTrader is in Backtest or Live mode * Work with live market data + * Raise exceptions if the user enters data that + IB won't like (i.e. barsize/duration string formats) + * Decide/discuss approaches to handle IB's simultaneous data feed limit. """ def __init__( self, ib_service, events_queue, param_contracts, settings, mode="historic", diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index 761bf788..63feedba 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -46,16 +46,8 @@ def setUp(self): Set up the PriceHandler object with a small set of initial tickers for a backtest in historic mode. - For now, while testing locally with IB (i.e. not Travis), implement - all real IB functionality (i.e. set up service) - TODO: - * Mock IB Service, populate dummy data. - * Duplicate tests from test_price_handler - * Add test to all price_handlers for get_last_close() - * Test multiple tickers * Test multiple timeframes - * Test returned date formats * Test mocked live market data methods """ self.ib_service = IBServiceMock() @@ -115,12 +107,6 @@ def test_stream_all_historic_events(self): def test_made_historical_requests(self): self.assertEqual(self.ib_service.countHistoricalRequestsMade, 2) - def test_can_handle_all_bar_sizes(self): - self.assertEqual(1, 2) - - - - if __name__ == "__main__": unittest.main() From 60bfa5b935d1a247e88ce780beb8b27cd3957386 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 11 Mar 2017 20:33:58 +1100 Subject: [PATCH 25/35] Linting --- examples/buy_and_hold_historic_ib.py | 7 ++-- qstrader/price_handler/ib_bar.py | 47 +++------------------- qstrader/service/ib.py | 59 ++++++++-------------------- tests/test_ib_price_handler.py | 34 +++++++--------- 4 files changed, 41 insertions(+), 106 deletions(-) diff --git a/examples/buy_and_hold_historic_ib.py b/examples/buy_and_hold_historic_ib.py index 45b234b6..e7959e23 100644 --- a/examples/buy_and_hold_historic_ib.py +++ b/examples/buy_and_hold_historic_ib.py @@ -7,7 +7,8 @@ from qstrader.trading_session.backtest import Backtest from qstrader.service.ib import IBService from qstrader.price_handler.ib_bar import IBBarPriceHandler -from ibapi.contract import * +from ibapi.contract import Contract + class BuyAndHoldStrategy(AbstractStrategy): """ @@ -46,7 +47,7 @@ def run(config, testing, tickers, filename): # Set up IBService ib_service = IBService() - ib_service.connect("127.0.0.1", 4001, 0) # TODO from config + ib_service.connect("127.0.0.1", 4001, 0) # TODO from config ib_service.start() # Set up IB Contract objects for the PriceHandler @@ -72,7 +73,7 @@ def run(config, testing, tickers, filename): # Use the Buy and Hold Strategy strategy = BuyAndHoldStrategy(tickers, events_queue) - # Start/End TODO redundant -- only required for default (Yahoo) price handler + # Start/End TODO redundant -- only required for default (Yahoo) price handler. start_date = datetime.datetime(2000, 1, 1) end_date = datetime.datetime(2014, 1, 1) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index e5a1653b..89b6ef72 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -1,35 +1,9 @@ -import os import datetime import queue import pandas as pd from .base import AbstractBarPriceHandler from ..event import BarEvent - from ..price_parser import PriceParser -from qstrader.service.ib import IBService - -from ibapi.client import EClient -from ibapi.wrapper import EWrapper -from ibapi.utils import iswrapper -from ibapi.contract import * -from ibapi.common import * - - - -#types -from ibapi.utils import (current_fn_name, BadMessage) -from ibapi.common import * -from ibapi.order_condition import * -from ibapi.contract import * -from ibapi.order import * -from ibapi.order_state import * -from ibapi.execution import Execution -from ibapi.execution import ExecutionFilter -from ibapi.commission_report import CommissionReport -from ibapi.scanner import ScannerSubscription -from ibapi.ticktype import * - -from ibapi.account_summary_tags import * class IBBarPriceHandler(AbstractBarPriceHandler): @@ -50,7 +24,7 @@ class IBBarPriceHandler(AbstractBarPriceHandler): """ def __init__( self, ib_service, events_queue, param_contracts, settings, mode="historic", - hist_end_date = datetime.datetime.now() - datetime.timedelta(days=3), + hist_end_date=datetime.datetime.now() - datetime.timedelta(days=3), hist_duration="5 D", hist_barsize="1 min" ): self.ib_service = ib_service @@ -69,7 +43,7 @@ def __init__( "8 hours": 28800, "1 day": 86400 } - self.tickers = {} # Required to be populated for some parent methods. + self.tickers = {} # Required to be populated for some parent methods. self.bar_stream = queue.Queue() self.events_queue = events_queue self.mode = mode @@ -78,14 +52,11 @@ def __init__( self.hist_duration = hist_duration self.qst_barsize = self.barsize_lookup[hist_barsize] self.ib_barsize = hist_barsize - # The position of a contract in this dict is used as its IB ID. - self.contracts = {} # TODO gross + self.contracts = {} # TODO gross self.contract_lookup = {} - for contract in param_contracts: # TODO gross param_contracts -- combine above? self._subscribe_contract(contract) - self._wait_for_hist_population() self._merge_sort_contract_data() @@ -95,19 +66,16 @@ def _subscribe_contract(self, contract): """ # Add ticker symbol, as required by some parent methods self.tickers[contract.symbol] = {} - if self.mode == "historic": ib_contract_id = len(self.contracts) end_time = datetime.datetime.strftime(self.hist_end_date, "%Y%m%d 17:00:00") self.ib_service.reqHistoricalData( ib_contract_id, contract, end_time, self.hist_duration, self.ib_barsize, "TRADES", True, 2, None) - # TODO gross self.contract_lookup[len(self.contracts)] = contract.symbol self.contracts[contract] = {} - def _wait_for_hist_population(self): """ Blocks until the historical dataset has been populated. @@ -115,7 +83,6 @@ def _wait_for_hist_population(self): while len(self.ib_service.waitingHistoricalData) != 0: pass - def _merge_sort_contract_data(self): """ Collects all the equities data from thte IBService, and populates the @@ -129,7 +96,6 @@ def _merge_sort_contract_data(self): for bar_tuple in historicalData: self.bar_stream.put(bar_tuple) - def _create_event(self, mkt_event): """ mkt_event is a tuple created according to the format: @@ -142,14 +108,13 @@ def _create_event(self, mkt_event): high_price = PriceParser.parse(mkt_event[3]) low_price = PriceParser.parse(mkt_event[4]) close_price = PriceParser.parse(mkt_event[5]) - adj_close_price = PriceParser.parse(mkt_event[5]) # TODO redundant? + adj_close_price = PriceParser.parse(mkt_event[5]) # TODO redundant? volume = mkt_event[6] return BarEvent( symbol, time, barsize, open_price, high_price, low_price, close_price, volume, adj_close_price ) - def stream_next(self): """ Create the next BarEvent and place it onto the event queue. @@ -157,8 +122,8 @@ def stream_next(self): try: # Create, store and return the bar event. mkt_event = self.bar_stream.get(False) - bev = self._create_event(mkt_event); + bev = self._create_event(mkt_event) self._store_event(bev) self.events_queue.put(bev) - except Queue.Empty: + except queue.Empty: self.continue_backtest = False diff --git a/qstrader/service/ib.py b/qstrader/service/ib.py index 73d5d80d..2091ba12 100644 --- a/qstrader/service/ib.py +++ b/qstrader/service/ib.py @@ -1,32 +1,11 @@ -import sys -import argparse -import datetime -import collections -import inspect import queue import threading -import logging -import time -import os.path - +from ibapi import comm from ibapi.wrapper import EWrapper from ibapi.client import EClient -from ibapi.utils import iswrapper - -#types -from ibapi.utils import (current_fn_name, BadMessage) -from ibapi.common import * -from ibapi.order_condition import * -from ibapi.contract import * -from ibapi.order import * -from ibapi.order_state import * -from ibapi.execution import Execution -from ibapi.execution import ExecutionFilter -from ibapi.commission_report import CommissionReport -from ibapi.scanner import ScannerSubscription -from ibapi.ticktype import * - -from ibapi.account_summary_tags import * +from ibapi.common import NO_VALID_ID, MAX_MSG_LEN, TickerId, TagValueList +from ibapi.contract import Contract +from ibapi.errors import BAD_LENGTH class IBService(EWrapper, EClient, threading.Thread): @@ -63,41 +42,37 @@ def __init__(self): self.historicalDataQueue = queue.Queue() self.waitingHistoricalData = [] - - def error(self, reqId:TickerId, errorCode:int, errorString:str): + def error(self, reqId: TickerId, errorCode: int, errorString: str): super().error(reqId, errorCode, errorString) - print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString) - + print("Error. Id: ", reqId, " Code: ", errorCode, " Msg: ", errorString) """ Append `reqId` to waitingHistoricalData, then call the super method. """ - def reqHistoricalData(self, reqId:TickerId , contract:Contract, endDateTime:str, - durationStr:str, barSizeSetting:str, whatToShow:str, - useRTH:int, formatDate:int, chartOptions:TagValueList): + def reqHistoricalData(self, reqId: TickerId, contract: Contract, endDateTime: str, + durationStr: str, barSizeSetting: str, whatToShow: str, + useRTH: int, formatDate: int, chartOptions: TagValueList): self.waitingHistoricalData.append(reqId) - super().reqHistoricalData( reqId, contract, endDateTime, + super().reqHistoricalData(reqId, contract, endDateTime, durationStr, barSizeSetting, whatToShow, useRTH, formatDate, chartOptions) - """ Populate the HistoricalData queue. """ - def historicalData(self, reqId:TickerId , date:str, open:float, high:float, - low:float, close:float, volume:int, barCount:int, - WAP:float, hasGaps:int): + def historicalData(self, reqId: TickerId, date: str, open: float, high: float, + low: float, close: float, volume: int, barCount: int, + WAP: float, hasGaps: int): self.historicalDataQueue.put((reqId, date, open, high, low, close, - volume, barCount, WAP, hasGaps)) + volume, barCount, WAP, hasGaps)) """ Remove `reqId` from waitingHistoricalData TODO: Will it work with multiple historical requests for same symbol? """ - def historicalDataEnd(self, reqId:int, start:str, end:str): + def historicalDataEnd(self, reqId: int, start: str, end: str): self.waitingHistoricalData.remove(reqId) - """ Overridden from the Threading class. Infinite loop which handles message passing from IB to QSTrader. This loop is run in new thread when @@ -109,11 +84,11 @@ def run(self): text = self.msg_queue.get(block=True, timeout=0.2) if len(text) > MAX_MSG_LEN: self.wrapper.error(NO_VALID_ID, BAD_LENGTH.code(), - "%s:%d:%s" % (BAD_LENGTH.msg(), len(text), text)) + "%s:%d:%s" % (BAD_LENGTH.msg(), len(text), text)) self.disconnect() break except queue.Empty: - pass # TODO something more appropriate + pass # TODO something more appropriate else: fields = comm.read_fields(text) self.decoder.interpret(fields) diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index 63feedba..454127fe 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -1,14 +1,11 @@ import unittest -import mock -import queue import numpy as np import pandas as pd from qstrader.price_parser import PriceParser from qstrader.price_handler.ib_bar import IBBarPriceHandler from qstrader.compat import queue -from qstrader.service.ib import IBService from qstrader import settings -from ibapi.contract import * +from ibapi.contract import Contract # Starting at 2017-01-01 13:00:00, 1 minute bars. @@ -23,23 +20,22 @@ def __init__(self): self.countHistoricalRequestsMade = 0 # Populate some historic data for the mock service. # CBA mock data - for i in range(0,10): + for i in range(0, 10): self.historicalDataQueue.put((0, timestamp + (i * 60), - closes[i], closes[i]+1, - closes[i]-1, closes[i+1], - 1000000, 100, closes[i], False)) + closes[i], closes[i] + 1, + closes[i] - 1, closes[i + 1], + 1000000, 100, closes[i], False)) # BHP mock data - for i in range(0,10): + for i in range(0, 10): self.historicalDataQueue.put((1, timestamp + (i * 60), - closes[i]/2, closes[i]+1/2, - (closes[i]-1)/2, closes[i+1]/2, - 1000000, 100, closes[i]/2, False)) + closes[i] / 2, closes[i] + 1 / 2, + (closes[i] - 1) / 2, closes[i + 1] / 2, + 1000000, 100, closes[i] / 2, False)) def reqHistoricalData(self, *arg): self.countHistoricalRequestsMade += 1 - class TestPriceHandlerSimpleCase(unittest.TestCase): def setUp(self): """ @@ -52,7 +48,6 @@ def setUp(self): """ self.ib_service = IBServiceMock() self.config = settings.TEST - fixtures_path = self.config.CSV_DATA_DIR events_queue = queue.Queue() # Set up an IB Contract for CBA and BHP @@ -70,10 +65,9 @@ def setUp(self): # Create the price handler. self.price_handler = IBBarPriceHandler( - self.ib_service, events_queue, [cba,bhp], self.config + self.ib_service, events_queue, [cba, bhp], self.config ) - def test_stream_all_historic_events(self): """ Will test that: @@ -86,22 +80,22 @@ def test_stream_all_historic_events(self): self.price_handler.stream_next() self.assertEqual( self.price_handler.tickers["CBA"]["timestamp"], - pd.Timestamp((timestamp + (i*60)) * 1e9) + pd.Timestamp((timestamp + (i * 60)) * 1e9) ) self.assertEqual( PriceParser.display(self.price_handler.tickers["CBA"]["close"]), - closes[i+1] # Close is next open + closes[i + 1] # Close is next open ) # Test BHP self.price_handler.stream_next() self.assertEqual( self.price_handler.tickers["BHP"]["timestamp"], - pd.Timestamp((timestamp + (i*60)) * 1e9) + pd.Timestamp((timestamp + (i * 60)) * 1e9) ) self.assertEqual( PriceParser.display(self.price_handler.tickers["BHP"]["close"]), - closes[i+1]/2 # Close is next open + closes[i + 1] / 2 # Close is next open ) def test_made_historical_requests(self): From eba9654e4f1ba3b4f52225f43d246d449dfcdce5 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sat, 11 Mar 2017 20:46:02 +1100 Subject: [PATCH 26/35] Fix queue compatibility import --- qstrader/price_handler/ib_bar.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index 89b6ef72..a1d209ff 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -1,6 +1,6 @@ import datetime -import queue import pandas as pd +from qstrader.compat import queue from .base import AbstractBarPriceHandler from ..event import BarEvent from ..price_parser import PriceParser From fe5f5dff6544c6e53c04f959ef5acb094e08086f Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Sun, 12 Mar 2017 12:22:52 +1100 Subject: [PATCH 27/35] Update a few references. Test cases don't cover live IB unfrotunately -- need a full integration test to run locally --- examples/buy_and_hold_historic_ib.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/buy_and_hold_historic_ib.py b/examples/buy_and_hold_historic_ib.py index e7959e23..7b7d2f63 100644 --- a/examples/buy_and_hold_historic_ib.py +++ b/examples/buy_and_hold_historic_ib.py @@ -4,7 +4,7 @@ from qstrader.strategy.base import AbstractStrategy from qstrader.event import SignalEvent, EventType from qstrader.compat import queue -from qstrader.trading_session.backtest import Backtest +from qstrader.trading_session import TradingSession from qstrader.service.ib import IBService from qstrader.price_handler.ib_bar import IBBarPriceHandler from ibapi.contract import Contract @@ -78,12 +78,12 @@ def run(config, testing, tickers, filename): end_date = datetime.datetime(2014, 1, 1) # Set up the backtest - backtest = Backtest( + backtest = TradingSession( config, strategy, tickers, initial_equity, start_date, end_date, - events_queue, price_handler, title=title + events_queue, price_handler=price_handler, title=title ) - results = backtest.simulate_trading(testing=testing) + results = backtest.start_trading(testing=testing) # Disconnect from services ib_service.stop_event.set() From 393ae63433ac8fe76ae6da7acc9296df616497fa Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 15 Mar 2017 10:03:02 +1100 Subject: [PATCH 28/35] Started some test cases for live market data requests --- tests/test_ib_price_handler.py | 80 +++++++++++++++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index 454127fe..4ae20cc1 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -7,6 +7,14 @@ from qstrader import settings from ibapi.contract import Contract +""" +TODO + * Code repetition in this file + * Swaps between camelCase and snake_case, + largely because IB uses camelCase. Be more consistent? + * Test that the price handler called IBService methods with correct params. +""" + # Starting at 2017-01-01 13:00:00, 1 minute bars. timestamp = 1483275600 @@ -18,6 +26,7 @@ def __init__(self): self.historicalDataQueue = queue.Queue() self.waitingHistoricalData = [] self.countHistoricalRequestsMade = 0 + self.countMarketDataRequestsMade = 0 # Populate some historic data for the mock service. # CBA mock data for i in range(0, 10): @@ -35,8 +44,77 @@ def __init__(self): def reqHistoricalData(self, *arg): self.countHistoricalRequestsMade += 1 + def reqMktData(self, *arg): + self.countMarketDataRequestsMade += 1 + + +class TestPriceHandlerLiveCase(unittest.TestCase): + def setUp(self): + """ + Set up the PriceHandler object with a small + set of market data for a mocked 'live' trading session. + + TODO: + * Test multiple timeframes + * Test successfully cancels market data feeds + * Test handling of maxing out IB's market data streaming connections + """ + self.ib_service = IBServiceMock() + self.config = settings.TEST + events_queue = queue.Queue() + + # Set up an IB Contract for CBA and BHP + cba = Contract() + cba.exchange = "SMART" + cba.symbol = "CBA" + cba.secType = "STK" + cba.currency = "AUD" + + bhp = Contract() + bhp.exchange = "SMART" + bhp.symbol = "BHP" + bhp.secType = "STK" + bhp.currency = "AUD" + + # Create the price handler. + self.price_handler = IBBarPriceHandler( + self.ib_service, events_queue, [cba, bhp], self.config, mode="live" + ) + + def test_stream_all_live_events(self): + """ + Will test that: + * live data is requested and collected from IBService + * live data is streamed out correctly + """ + for i in range(0, 10): + # Test CBA + self.price_handler.stream_next() + self.assertEqual( + self.price_handler.tickers["CBA"]["timestamp"], + pd.Timestamp((timestamp + (i * 60)) * 1e9) + ) + self.assertEqual( + PriceParser.display(self.price_handler.tickers["CBA"]["close"]), + closes[i + 1] # Close is next open + ) + + # Test BHP + self.price_handler.stream_next() + self.assertEqual( + self.price_handler.tickers["BHP"]["timestamp"], + pd.Timestamp((timestamp + (i * 60)) * 1e9) + ) + self.assertEqual( + PriceParser.display(self.price_handler.tickers["BHP"]["close"]), + closes[i + 1] / 2 # Close is next open + ) + + def test_made_market_data_requests(self): + self.assertEqual(self.ib_service.countMarketDataRequestsMade, 2) + -class TestPriceHandlerSimpleCase(unittest.TestCase): +class TestPriceHandlerHistoricCase(unittest.TestCase): def setUp(self): """ Set up the PriceHandler object with a small From c05e93a81cc7313118ae60b5734cf90694ea6d5a Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 15 Mar 2017 10:31:17 +1100 Subject: [PATCH 29/35] Live pricehandler should request properly and stream bars when popualted by IB --- qstrader/price_handler/ib_bar.py | 20 ++++++++++++++++++-- qstrader/service/ib.py | 11 +++++++++++ tests/test_ib_price_handler.py | 20 +++++++++++++++++++- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index a1d209ff..fc32a671 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -21,6 +21,7 @@ class IBBarPriceHandler(AbstractBarPriceHandler): * Raise exceptions if the user enters data that IB won't like (i.e. barsize/duration string formats) * Decide/discuss approaches to handle IB's simultaneous data feed limit. + * Decide/discuss on support of live market data (ticks, opposed to bars) """ def __init__( self, ib_service, events_queue, param_contracts, settings, mode="historic", @@ -57,8 +58,11 @@ def __init__( self.contract_lookup = {} for contract in param_contracts: # TODO gross param_contracts -- combine above? self._subscribe_contract(contract) - self._wait_for_hist_population() - self._merge_sort_contract_data() + if self.mode == "historic": + self._wait_for_hist_population() + self._merge_sort_contract_data() + elif self.mode == "live": # Assign a reference to the live bars populated by IB. + self.bar_stream = self.ib_service.realtimeBarQueue def _subscribe_contract(self, contract): """ @@ -66,12 +70,20 @@ def _subscribe_contract(self, contract): """ # Add ticker symbol, as required by some parent methods self.tickers[contract.symbol] = {} + if self.mode == "live": + ib_contract_id = len(self.contracts) + end_time = datetime.datetime.strftime(self.hist_end_date, "%Y%m%d 17:00:00") + self.ib_service.reqRealTimeBars( + ib_contract_id, contract, self.ib_barsize, "TRADES", True, None + ) + if self.mode == "historic": ib_contract_id = len(self.contracts) end_time = datetime.datetime.strftime(self.hist_end_date, "%Y%m%d 17:00:00") self.ib_service.reqHistoricalData( ib_contract_id, contract, end_time, self.hist_duration, self.ib_barsize, "TRADES", True, 2, None) + # TODO gross self.contract_lookup[len(self.contracts)] = contract.symbol self.contracts[contract] = {} @@ -88,6 +100,8 @@ def _merge_sort_contract_data(self): Collects all the equities data from thte IBService, and populates the member Queue `self.bar_stream`. This queue is used for the `stream_next()` function. + + Note this is not necessary for live data. """ historicalData = [] while not self.ib_service.historicalDataQueue.empty(): @@ -118,6 +132,8 @@ def _create_event(self, mkt_event): def stream_next(self): """ Create the next BarEvent and place it onto the event queue. + + TODO make more clear if differences between live/historic? """ try: # Create, store and return the bar event. diff --git a/qstrader/service/ib.py b/qstrader/service/ib.py index 2091ba12..cd0dd149 100644 --- a/qstrader/service/ib.py +++ b/qstrader/service/ib.py @@ -39,6 +39,7 @@ def __init__(self): threading.Thread.__init__(self, name='IBService') self.stop_event = threading.Event() # Set up data queues. + self.realtimeBarQueue = queue.Queue() self.historicalDataQueue = queue.Queue() self.waitingHistoricalData = [] @@ -57,6 +58,16 @@ def reqHistoricalData(self, reqId: TickerId, contract: Contract, endDateTime: st durationStr, barSizeSetting, whatToShow, useRTH, formatDate, chartOptions) + """ + Populate the RealTimeBars queue. + Note that `time` is the start of the bar + """ + def realtimeBar(self, reqId: TickerId, time:int, open: float, high: float, + low: float, close: float, volume: float, + wap: float, count: int): + self.realtimeBarQueue.put((reqId, time, open, high, low, close, + volume, wap, count)) + """ Populate the HistoricalData queue. """ diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index 4ae20cc1..baf24718 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -24,6 +24,7 @@ class IBServiceMock(object): def __init__(self): self.historicalDataQueue = queue.Queue() + self.realtimeBarQueue = queue.Queue() self.waitingHistoricalData = [] self.countHistoricalRequestsMade = 0 self.countMarketDataRequestsMade = 0 @@ -41,10 +42,25 @@ def __init__(self): (closes[i] - 1) / 2, closes[i + 1] / 2, 1000000, 100, closes[i] / 2, False)) + # Populate mock realtimeBars + for i in range(0, 10): + # CBA + self.realtimeBarQueue.put((0, timestamp + (i * 60), + closes[i], closes[i] + 1, + closes[i] - 1, closes[i + 1], + 1000000, 100, closes[i], False)) + # BHP + self.realtimeBarQueue.put((1, timestamp + (i * 60), + closes[i] / 2, closes[i] + 1 / 2, + (closes[i] - 1) / 2, closes[i + 1] / 2, + 1000000, 100, closes[i] / 2, False)) + + + def reqHistoricalData(self, *arg): self.countHistoricalRequestsMade += 1 - def reqMktData(self, *arg): + def reqRealTimeBars(self, *arg): self.countMarketDataRequestsMade += 1 @@ -58,6 +74,8 @@ def setUp(self): * Test multiple timeframes * Test successfully cancels market data feeds * Test handling of maxing out IB's market data streaming connections + * Test that live can understand 'start' of a bar, but historic + might use 'end' of a bar (RE timestamps)?? """ self.ib_service = IBServiceMock() self.config = settings.TEST From efbf3a5a032921a1e76507134744fbf346c4b51e Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 15 Mar 2017 10:56:26 +1100 Subject: [PATCH 30/35] Live market data working with example --- examples/buy_and_hold_historic_ib.py | 2 +- examples/buy_and_hold_live_ib.py | 105 +++++++++++++++++++++++++++ qstrader/price_handler/ib_bar.py | 4 + qstrader/trading_session.py | 4 + 4 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 examples/buy_and_hold_live_ib.py diff --git a/examples/buy_and_hold_historic_ib.py b/examples/buy_and_hold_historic_ib.py index 7b7d2f63..295d5177 100644 --- a/examples/buy_and_hold_historic_ib.py +++ b/examples/buy_and_hold_historic_ib.py @@ -67,7 +67,7 @@ def run(config, testing, tickers, filename): end_date = datetime.datetime.now() - datetime.timedelta(days=1) price_handler = IBBarPriceHandler( ib_service, events_queue, contracts, config, - "historic", end_date, hist_duration="5 D", hist_barsize="1 min" + "historic", end_date, hist_duration="5 D", barsize="1 min" ) # Use the Buy and Hold Strategy diff --git a/examples/buy_and_hold_live_ib.py b/examples/buy_and_hold_live_ib.py new file mode 100644 index 00000000..a298d3bc --- /dev/null +++ b/examples/buy_and_hold_live_ib.py @@ -0,0 +1,105 @@ +import datetime + +from qstrader import settings +from qstrader.strategy.base import AbstractStrategy +from qstrader.event import SignalEvent, EventType +from qstrader.compat import queue +from qstrader.trading_session import TradingSession +from qstrader.service.ib import IBService +from qstrader.price_handler.ib_bar import IBBarPriceHandler +from ibapi.contract import Contract + + +class BuyAndHoldStrategy(AbstractStrategy): + """ + A testing strategy that simply purchases (longs) any asset that + matches what was passed in on initialization and + then holds until the completion of a backtest. + """ + def __init__( + self, tickers, events_queue, + base_quantity=100 + ): + self.tickers = tickers + self.invested = dict.fromkeys(tickers) + self.events_queue = events_queue + self.base_quantity = base_quantity + + def calculate_signals(self, event): + if ( + event.type in [EventType.BAR, EventType.TICK] and + event.ticker in self.tickers + ): + if not self.invested[event.ticker]: + signal = SignalEvent( + event.ticker, "BOT", + suggested_quantity=self.base_quantity + ) + self.events_queue.put(signal) + self.invested[event.ticker] = True + + +def run(config, testing, tickers, filename): + # Backtest information + title = ['Buy and Hold Live IB Example -- 5 Sec Bars'] + initial_equity = 10000.0 + events_queue = queue.Queue() + + # Set up IBService + ib_service = IBService() + ib_service.connect("127.0.0.1", 4001, 0) # TODO from config + ib_service.start() + + # Set up IB Contract objects for the PriceHandler + # MORE INFO: https://www.interactivebrokers.com/en/?f=%2Fen%2Fgeneral%2Fcontact%2FtipsContractsDatabaseSearch.php%3Fib_entity%3Dllc + symbols = ["CBA", "BHP", "STO", "FMG", "WOW", "WES"] + contracts = [] + for symbol in symbols: + contract = Contract() + contract.exchange = "SMART" + contract.symbol = symbol + contract.secType = "STK" + contract.currency = "AUD" + contracts.append(contract) + + # Set up the IB PriceHandler. Want 5 day's of minute bars, up to yesterday. + # Look at IB Documentation for possible values. + end_date = datetime.datetime.now() - datetime.timedelta(days=1) + price_handler = IBBarPriceHandler( + ib_service, events_queue, contracts, config, + "live" + ) + + # Use the Buy and Hold Strategy + strategy = BuyAndHoldStrategy(tickers, events_queue) + + # Start/End TODO redundant -- only required for default (Yahoo) price handler. + start_date = datetime.datetime(2000, 1, 1) + end_date = datetime.datetime(2014, 1, 1) + + # Set up the backtest + session = TradingSession( + config, strategy, tickers, + initial_equity, start_date, end_date, + events_queue, session_type="live", + end_session_time=datetime.datetime.now() + datetime.timedelta(minutes=1), + price_handler=price_handler, title=title + ) + results = session.start_trading(testing=testing) + + # Disconnect from services + ib_service.stop_event.set() + ib_service.join() + + return results + + +if __name__ == "__main__": + # Configuration data + testing = False + config = settings.from_file( + settings.DEFAULT_CONFIG_FILENAME, testing + ) + tickers = ["CBA", "BHP", "STO", "FMG", "WOW", "WES"] + filename = None + run(config, testing, tickers, filename) diff --git a/qstrader/price_handler/ib_bar.py b/qstrader/price_handler/ib_bar.py index fc32a671..42c4d3db 100644 --- a/qstrader/price_handler/ib_bar.py +++ b/qstrader/price_handler/ib_bar.py @@ -53,6 +53,10 @@ def __init__( self.hist_duration = hist_duration self.qst_barsize = self.barsize_lookup[hist_barsize] self.ib_barsize = hist_barsize + # IB Only supports `5` as an int, for live barsize. + if self.mode == "live": + self.ib_barsize = 5 + # The position of a contract in this dict is used as its IB ID. self.contracts = {} # TODO gross self.contract_lookup = {} diff --git a/qstrader/trading_session.py b/qstrader/trading_session.py index 59ba5903..567d536f 100644 --- a/qstrader/trading_session.py +++ b/qstrader/trading_session.py @@ -16,6 +16,9 @@ class TradingSession(object): """ Enscapsulates the settings and components for carrying out either a backtest or live trading session. + + TODO logic leak from here/pricehandler with live & end_session_time, + code smell. I.e. we set end_time differently when live vs historic. """ def __init__( self, config, strategy, tickers, @@ -51,6 +54,7 @@ def __init__( self.session_type = session_type self._config_session() self.cur_time = None + self.end_session_time = end_session_time if self.session_type == "live": if self.end_session_time is None: From 1fd72aafccf0865267202e288d7ade7bd50e1ae5 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 15 Mar 2017 11:07:58 +1100 Subject: [PATCH 31/35] Add displayStrategy for live trading example to see market data coming in --- examples/buy_and_hold_live_ib.py | 35 ++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/examples/buy_and_hold_live_ib.py b/examples/buy_and_hold_live_ib.py index a298d3bc..b810bd69 100644 --- a/examples/buy_and_hold_live_ib.py +++ b/examples/buy_and_hold_live_ib.py @@ -1,7 +1,8 @@ import datetime +from qstrader.price_parser import PriceParser from qstrader import settings -from qstrader.strategy.base import AbstractStrategy +from qstrader.strategy.base import Strategies, AbstractStrategy from qstrader.event import SignalEvent, EventType from qstrader.compat import queue from qstrader.trading_session import TradingSession @@ -9,6 +10,35 @@ from qstrader.price_handler.ib_bar import IBBarPriceHandler from ibapi.contract import Contract +class DisplayStrategy(AbstractStrategy): + """ + A strategy which display ticks / bars + params: + n = 10000 + n_window = 5 + """ + def __init__(self, n=100, n_window=5): + self.n = n + self.n_window = n_window + self.i = 0 + + def calculate_signals(self, event): + if event.type in [EventType.TICK, EventType.BAR]: + # Format the event for human display + if event.type == EventType.BAR: + event.open_price = PriceParser.display(event.open_price) + event.high_price = PriceParser.display(event.high_price) + event.low_price = PriceParser.display(event.low_price) + event.close_price = PriceParser.display(event.close_price) + event.adj_close_price = PriceParser.display(event.adj_close_price) + else: # event.type == EventType.TICK + event.bid = PriceParser.display(event.bid) + event.ask = PriceParser.display(event.ask) + + if self.i % self.n in range(self.n_window): + print("%d %s" % (self.i, event)) + self.i += 1 + class BuyAndHoldStrategy(AbstractStrategy): """ @@ -71,7 +101,8 @@ def run(config, testing, tickers, filename): ) # Use the Buy and Hold Strategy - strategy = BuyAndHoldStrategy(tickers, events_queue) + strategy = Strategies(BuyAndHoldStrategy(tickers, events_queue), DisplayStrategy(n=20)) + # Start/End TODO redundant -- only required for default (Yahoo) price handler. start_date = datetime.datetime(2000, 1, 1) From 8c4f8227f199c1fa1543d65a8b5591fd03c0285c Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 15 Mar 2017 11:36:07 +1100 Subject: [PATCH 32/35] Linting --- examples/buy_and_hold_live_ib.py | 4 ++-- qstrader/service/ib.py | 2 +- tests/test_ib_price_handler.py | 2 -- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/examples/buy_and_hold_live_ib.py b/examples/buy_and_hold_live_ib.py index b810bd69..0a13a1ce 100644 --- a/examples/buy_and_hold_live_ib.py +++ b/examples/buy_and_hold_live_ib.py @@ -10,6 +10,7 @@ from qstrader.price_handler.ib_bar import IBBarPriceHandler from ibapi.contract import Contract + class DisplayStrategy(AbstractStrategy): """ A strategy which display ticks / bars @@ -103,7 +104,6 @@ def run(config, testing, tickers, filename): # Use the Buy and Hold Strategy strategy = Strategies(BuyAndHoldStrategy(tickers, events_queue), DisplayStrategy(n=20)) - # Start/End TODO redundant -- only required for default (Yahoo) price handler. start_date = datetime.datetime(2000, 1, 1) end_date = datetime.datetime(2014, 1, 1) @@ -113,7 +113,7 @@ def run(config, testing, tickers, filename): config, strategy, tickers, initial_equity, start_date, end_date, events_queue, session_type="live", - end_session_time=datetime.datetime.now() + datetime.timedelta(minutes=1), + end_session_time=datetime.datetime.now() + datetime.timedelta(minutes=10), price_handler=price_handler, title=title ) results = session.start_trading(testing=testing) diff --git a/qstrader/service/ib.py b/qstrader/service/ib.py index cd0dd149..48cb8f9b 100644 --- a/qstrader/service/ib.py +++ b/qstrader/service/ib.py @@ -62,7 +62,7 @@ def reqHistoricalData(self, reqId: TickerId, contract: Contract, endDateTime: st Populate the RealTimeBars queue. Note that `time` is the start of the bar """ - def realtimeBar(self, reqId: TickerId, time:int, open: float, high: float, + def realtimeBar(self, reqId: TickerId, time: int, open: float, high: float, low: float, close: float, volume: float, wap: float, count: int): self.realtimeBarQueue.put((reqId, time, open, high, low, close, diff --git a/tests/test_ib_price_handler.py b/tests/test_ib_price_handler.py index baf24718..ac671c46 100644 --- a/tests/test_ib_price_handler.py +++ b/tests/test_ib_price_handler.py @@ -55,8 +55,6 @@ def __init__(self): (closes[i] - 1) / 2, closes[i + 1] / 2, 1000000, 100, closes[i] / 2, False)) - - def reqHistoricalData(self, *arg): self.countHistoricalRequestsMade += 1 From 8c520558f8c15c3b4ff53f8cb0adff9ce453cb30 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 15 Mar 2017 11:47:45 +1100 Subject: [PATCH 33/35] Try to install ibapi python module manually in Travis build --- .travis.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.travis.yml b/.travis.yml index bc6d7cd7..733e87e5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,11 @@ env: # - env: PYTHON=2.7 PANDAS=0.12.0 # - env: PYTHON=2.7 PANDAS=0.11.0 +before_install: + - wget http://interactivebrokers.github.io/downloads/twsapi_macunix.973.02.zip + - unzip twsapi_macunix.973.02.zip + - pip install IBJts/source/pythonclient/ + install: - pip install -qq flake8 # You may want to periodically update this, although the conda update From 55d62d09ce6abb6f09626bd31caa632816ced822 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 15 Mar 2017 11:59:53 +1100 Subject: [PATCH 34/35] Modifying travis build --- .travis.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 733e87e5..4f0241f3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,12 +13,11 @@ env: # - env: PYTHON=2.7 PANDAS=0.12.0 # - env: PYTHON=2.7 PANDAS=0.11.0 -before_install: +install: - wget http://interactivebrokers.github.io/downloads/twsapi_macunix.973.02.zip - unzip twsapi_macunix.973.02.zip - pip install IBJts/source/pythonclient/ -install: - pip install -qq flake8 # You may want to periodically update this, although the conda update # conda line below will keep everything up-to-date. We do this From bc3e34722d77e3aff3f53d373d67a1f9c0c90c00 Mon Sep 17 00:00:00 2001 From: Ryan Kennedy Date: Wed, 15 Mar 2017 12:08:37 +1100 Subject: [PATCH 35/35] Modifying travis build --- .travis.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 4f0241f3..7d88a61b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,11 @@ sudo: false language: python +python: + - "3.4" + - "3.5" env: - - PYTHON=2.7 PANDAS=0.18.0 - PYTHON=3.4 PANDAS=0.18.0 - PYTHON=3.5 PANDAS=0.18.0