Skip to content

Commit

Permalink
Improved QuestDB queries
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaron committed Aug 23, 2023
1 parent 96c1ecb commit 2ff24b0
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.roboquant.questdb

import io.questdb.cairo.CairoEngine
import io.questdb.cairo.DefaultCairoConfiguration
import io.questdb.griffin.SqlException
import org.roboquant.common.*
import org.roboquant.loggers.MetricsLogger
import java.nio.file.Files
Expand Down Expand Up @@ -79,11 +80,13 @@ class QuestDBMetricsLogger(dbPath: Path = Config.home / "questdb-metrics" / "db"
*/
override fun getMetric(metricName: String, run: String): TimeSeries {
val result = mutableListOf<Observation>()
engine.query("select * from '$run' where metric='$metricName'") {
while (hasNext()) {
val r = this.record
val o = Observation(ofEpochMicro(r.getTimestamp(1)), r.getDouble(0))
result.add(o)
if (tables.contains(run)) {
engine.query("select time, value from '$run' where metric='$metricName'") {
while (hasNext()) {
val r = this.record
val o = Observation(ofEpochMicro(r.getTimestamp(0)), r.getDouble(1))
result.add(o)
}
}
}
return TimeSeries(result)
Expand All @@ -102,7 +105,11 @@ class QuestDBMetricsLogger(dbPath: Path = Config.home / "questdb-metrics" / "db"
}

override fun start(run: String, timeframe: Timeframe) {
// engine.update("drop table $run")
try {
engine.dropTable(run)
} catch (e: SqlException) {
logger.error(e) { "error with drop table $run"}
}
tables.remove(run)
}

Expand All @@ -111,24 +118,42 @@ class QuestDBMetricsLogger(dbPath: Path = Config.home / "questdb-metrics" / "db"
return engine.distictSymbol(run, "name").toSortedSet()
}

/**
* Remove all runs from the database, both current and past runs.
* Under the hood, this will drop all the tables in the database.
*/
fun removeAllRuns() {
engine.dropAllTables()
tables.clear()
logger.info { "removed all runs from ${engine.configuration.root}" }
}

override val runs: Set<String>
get() = engine.tables().toSet()

private fun createTable(name: String) {
private fun createTable(tableName: String) {
engine.update(
"""CREATE TABLE IF NOT EXISTS '$name' (
"""CREATE TABLE IF NOT EXISTS '$tableName' (
|metric SYMBOL,
|value DOUBLE,
|time TIMESTAMP
|) timestamp(time)""".trimMargin(),
|), INDEX(metric) timestamp(time)""".trimMargin(),
)

engine.update("TRUNCATE TABLE '$name'")
engine.update("TRUNCATE TABLE '$tableName'")
}

fun close() {
engine.close()
}

}
/**
* Reset the state.
* It doesn't remove the underlying tables.
* Use [removeAllRuns] for that.
*/
override fun reset() {
tables.clear()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ import kotlin.io.path.isDirectory
* - Fast random access
* - Limited to a single [PriceAction] type per table
*
* @param dbPath the path to use for the database.
* If it doesn't exist yet, it will be created.
* The default value is `~/.roboquant/questdb-prices/db`
*
*/
class QuestDBRecorder(dbPath: Path = Config.home / "questdb-prices" / "db") {

Expand All @@ -67,6 +71,16 @@ class QuestDBRecorder(dbPath: Path = Config.home / "questdb-prices" / "db") {
const val HOUR = "HOUR"
}

/**
* Remove all the feeds from the database. This cannot be undone, so use this method with care.
*/
fun removeAllFeeds() {
CairoEngine(config).use {
it.dropAllTables()
logger.info { "Dropped all feeds in db=${config.root}" }
}
}


/**
* Generate a new QuestDB table based on the event in the feed and optional limited to the provided timeframe
Expand All @@ -76,8 +90,9 @@ class QuestDBRecorder(dbPath: Path = Config.home / "questdb-prices" / "db") {
* @param tableName the table to use to store the data
* @param timeframe the timeframe
* @param append do you want to append to an existing table, default is false
* @param partition partition table by specified value. This is required when wanting to insert timestamps
* out of order and might result in better overall performance. The default value is [NONE]
* @param partition partition the table using the specified value.
* This is required when wanting to append timestamps out of order and might result in better overall performance.
* The default value is [NONE]
*/
inline fun <reified T : PriceAction> record(
feed: Feed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ internal fun CairoEngine.tables(): Set<String> {
return result
}


internal fun CairoEngine.dropAllTables() {
update("DROP ALL TABLES")
}

internal fun CairoEngine.dropTable(tableName: String) {
update("DROP TABLE IF EXISTS '$tableName'")
}

internal fun CairoEngine.tableColumns(tableName: String): Set<String> {
val result = mutableSetOf<String>()
query("select column from table_columns('$tableName')") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ internal class QuestDBFeedTest {

assertEquals(inputFeed.assets, outputFeed.assets)
assertEquals(inputFeed.timeline.timeframe, outputFeed.timeframe)
outputFeed.close()
}


Expand All @@ -42,6 +43,7 @@ internal class QuestDBFeedTest {

assertEquals(inputFeed.assets, outputFeed.assets)
assertEquals(inputFeed.timeline.timeframe, outputFeed.timeframe)
outputFeed.close()
}

@Test
Expand All @@ -57,9 +59,9 @@ internal class QuestDBFeedTest {
recorder.record<PriceBar>(feed2, "pricebars3", append = true)

val outputFeed = QuestDBFeed("pricebars3", folder.toPath())

assertEquals(feed1.assets + feed2.assets, outputFeed.assets)
assertEquals(feed1.timeline.timeframe, outputFeed.timeframe)
outputFeed.close()
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ class QuestDBMetricsLoggerTest {
val aaa2 = logger2.getMetric("aaa", "myrun")
assertEquals(1, aaa2.size)
logger2.close()

val logger3 = QuestDBMetricsLogger(folder.toPath())
logger3.removeAllRuns()
logger3.loadPreviousRuns()
val aaa3 = logger3.getMetric("aaa", "myrun")
assertEquals(0, aaa3.size)
logger3.close()
}

}
37 changes: 23 additions & 14 deletions roboquant-questdb/src/test/kotlin/org/roboquant/samples/main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,36 @@ package org.roboquant.samples

import org.roboquant.Roboquant
import org.roboquant.brokers.sim.SimBroker
import org.roboquant.common.*
import org.roboquant.common.ParallelJobs
import org.roboquant.common.Timeframe
import org.roboquant.common.months
import org.roboquant.common.seconds
import org.roboquant.feeds.PriceBar
import org.roboquant.feeds.filter
import org.roboquant.feeds.random.RandomWalkFeed
import org.roboquant.loggers.LastEntryLogger
import org.roboquant.metrics.AccountMetric
import org.roboquant.questdb.QuestDBFeed
import org.roboquant.questdb.QuestDBMetricsLogger
import org.roboquant.questdb.QuestDBRecorder
import org.roboquant.strategies.EMAStrategy
import kotlin.system.measureTimeMillis


const val TABLE_NAME = "pricebars"
private const val TABLE_NAME = "pricebars"

private fun <T>printTimeMillis(key: String, block: () -> T) : T {
val result: T
val t = measureTimeMillis { result = block() }
println("$key time=$t")
println("$key time=${t}ms")
return result
}

fun create() {
private fun create() {
val f = RandomWalkFeed(Timeframe.past(12.months), nAssets = 3, timeSpan = 1.seconds)

printTimeMillis("create feed") {
printTimeMillis("create feed") {
val g = QuestDBRecorder()
g.removeAllFeeds()
g.record<PriceBar>(f, TABLE_NAME )
}
}
Expand All @@ -63,8 +67,8 @@ private fun read() {
println(f.timeframe)
}

printTimeMillis("iterate 1 year") {
f.filter<PriceBar>(Timeframe.past(1.years)) {
printTimeMillis("iterate 3 months") {
f.filter<PriceBar>(Timeframe.past(3.months)) {
false
}
}
Expand All @@ -73,21 +77,26 @@ private fun read() {
}


fun backTest() {
private fun backTest() {
val feed = QuestDBFeed(TABLE_NAME)
val jobs = ParallelJobs()
val logger = LastEntryLogger(false)
val logger = QuestDBMetricsLogger()
logger.removeAllRuns()
feed.timeframe.split(1.months).forEach { tf ->
jobs.add {
println("starting $tf")
val broker = SimBroker(limitTracking = true) // Optimized for large data
val run = "run-${tf.toPrettyString()}"
println("starting run=$run")
val broker = SimBroker(limitTracking = true) // Set to true to optimize for performance
val rq = Roboquant(EMAStrategy(), AccountMetric(), broker = broker, logger = logger)
rq.runAsync(feed, tf)
println("done $tf")
rq.runAsync(feed, tf, name = run)
println("done run=$run")
}
}

jobs.joinAllBlocking()

feed.close()
logger.close()
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import org.roboquant.feeds.Event
/**
* Capture the following high-level statistics about the [Account]:
*
* - `account.orders` Total number of orders, open and closed together
* - `account.trades` Total number of trades
* - `account.openorders` Total number of open orders,
* - `account.positions` Total number of open positions
* - `account.cash` Total cash value
* - `account.buyingpower` Buying power available
Expand All @@ -41,8 +40,7 @@ class AccountMetric : Metric {
override fun calculate(account: Account, event: Event): Map<String, Double> {

return metricResultsOf(
"account.orders" to account.openOrders.size + account.closedOrders.size,
"account.trades" to account.trades.size,
"account.openorders" to account.openOrders.size,
"account.positions" to account.positions.size,
"account.cash" to account.convert(account.cash, event.time).value,
"account.buyingpower" to account.buyingPower.value,
Expand Down
49 changes: 20 additions & 29 deletions roboquant/src/main/kotlin/org/roboquant/metrics/AlphaBetaMetric.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,13 @@ import org.roboquant.feeds.Event
* @property period Over how many events to calculate the beta
* @property priceType The type of price to use, default is "DEFAULT"
* @property riskFreeReturn the risk-free return, 1% is 0.01. Default is 0.0
* @property onlyAfterInitialTrade should we only start measuring after an initial trade has been executed, default is
* false
* @constructor
*/
class AlphaBetaMetric(
private val referenceAsset: Asset,
private val period: Int,
private val priceType: String = "DEFAULT",
private val riskFreeReturn: Double = 0.0,
private val onlyAfterInitialTrade: Boolean = false
) : Metric {

private val marketData = PriceSeries(period + 1)
Expand All @@ -58,38 +55,32 @@ class AlphaBetaMetric(
* Based on the provided [account] and [event], calculate any metrics and return them.
*/
override fun calculate(account: Account, event: Event): Map<String, Double> {
val action = event.prices[referenceAsset]
val action = event.prices[referenceAsset] ?: return emptyMap()

// Can we already start recording the measures, or do we have to wait for
// an initial trade?
val start = !onlyAfterInitialTrade || account.trades.isNotEmpty()
val price = action.getPrice(priceType)
marketData.add(price)

if (action !== null && start) {
val price = action.getPrice(priceType)
marketData.add(price)
val equity = account.equity
val value = account.convert(equity, time = event.time).value
portfolioData.add(value)

val equity = account.equity
val value = account.convert(equity, time = event.time).value
portfolioData.add(value)
if (marketData.isFull() && portfolioData.isFull()) {
val x1 = marketData.toDoubleArray()
val x2 = portfolioData.toDoubleArray()

if (marketData.isFull() && portfolioData.isFull()) {
val x1 = marketData.toDoubleArray()
val x2 = portfolioData.toDoubleArray()
val marketReturns = x1.returns()
val portfolioReturns = x1.returns()

val marketReturns = x1.returns()
val portfolioReturns = x1.returns()
val covariance = Covariance().covariance(portfolioReturns, marketReturns)
val variance = Variance().evaluate(marketReturns)
val beta = covariance / variance

val covariance = Covariance().covariance(portfolioReturns, marketReturns)
val variance = Variance().evaluate(marketReturns)
val beta = covariance / variance

val alpha =
(x1.totalReturn() - riskFreeReturn) - beta * (x2.totalReturn() - riskFreeReturn)
return mapOf(
"account.alpha" to alpha,
"account.beta" to beta
)
}
val alpha =
(x1.totalReturn() - riskFreeReturn) - beta * (x2.totalReturn() - riskFreeReturn)
return mapOf(
"account.alpha" to alpha,
"account.beta" to beta
)
}
return emptyMap()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ open class FlexPolicy(
private operator fun Collection<Order>.contains(asset: Asset) = any { it.asset == asset }

/**
* Record basic metrics: `actions`, `signals`, `orders.new`, `orders.open`, `orders.closed`,
* Record basic metrics: `actions`, `signals`, `orders.new`, `orders.open`,
* `positions` and `buyingpower`.
*
* The main purpose is to better understand when the policy is not behaving as expected.
Expand All @@ -265,7 +265,6 @@ open class FlexPolicy(
record("signals", signals.size)
record("orders.new", orders.size)
record("orders.open", account.openOrders.size)
record("orders.closed", account.closedOrders.size)
record("positions", account.positions.size)
record("buyingpower", account.buyingPower.value)
}
Expand Down
Loading

0 comments on commit 2ff24b0

Please sign in to comment.