Skip to content

Commit

Permalink
New AvroFeed impl
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaron committed May 25, 2024
1 parent cd69b21 commit 769f1aa
Show file tree
Hide file tree
Showing 41 changed files with 318 additions and 2,475 deletions.
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
<module>roboquant-avro</module>
<module>roboquant-charts</module>
<module>roboquant-ta</module>
<module>roboquant-ml</module>
<module>roboquant-jupyter</module>
<module>roboquant-alphavantage</module>
<module>roboquant-alpaca</module>
Expand Down
304 changes: 163 additions & 141 deletions roboquant-avro/src/main/kotlin/org/roboquant/avro/AvroFeed.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,107 +16,85 @@

package org.roboquant.avro

import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.avro.Schema
import org.apache.avro.file.CodecFactory
import org.apache.avro.file.DataFileConstants
import org.apache.avro.file.DataFileReader
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DatumWriter
import org.apache.avro.util.Utf8
import org.roboquant.common.*
import org.roboquant.common.Asset
import org.roboquant.common.Logging
import org.roboquant.common.Timeframe
import org.roboquant.common.compareTo
import org.roboquant.feeds.*
import java.io.InputStream
import java.net.URL
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.StandardCopyOption
import java.time.Instant
import java.util.*
import kotlin.io.path.isRegularFile

internal const val SCHEMA = """{
"namespace": "org.roboquant.avro.schema",
"type": "record",
"name": "PriceItemV2",
"fields": [
{"name": "timestamp_nanos", "type" : "long"},
{"name": "symbol", "type": "string"},
{"name": "type", "type": { "type": "enum", "name": "item_type", "symbols" : ["BAR", "TRADE", "QUOTE", "BOOK"]}},
{"name": "values", "type": {"type": "array", "items" : "double"}},
{"name": "other", "type": ["null", "string"], "default": null}
]
}"""

/**
* Read price data from a single file in Avro format. This feed loads data lazy and disposes of it afterwards, so
* memory footprint is low. Compared to CSV files, Avro files are parsed more efficient, making it a good fit for large
* back tests. Additionally, an Avro file can be compressed, reducing the overall disk space required.
*
* When the feed is instantiated, it will create an internal index for faster random access. Please note that
* currently the internal resolution is milliseconds.
* The internal resolution is nanoseconds and stored as a single Long value
*
* @property path the path where the Avro file can be found
* @property template template to use to convert the stored symbols into assets
*
* @constructor Create new Avro Feed
*/
class AvroFeed(private val path: Path, useCache: Boolean = false) : AssetFeed {
class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMPLATE")) : Feed {

/**
* Instantiate an Avro Feed based on the Avro file at [path]
*/
constructor(path: String) : this(Path.of(path))

/**
* Contains mapping of a serialized Asset string to an Asset
*/
private val assetLookup: Map<String, Asset>
private val logger = Logging.getLogger(AvroFeed::class)

/**
* MetadataProvider that holds time/position for quicker access rows
* in Avro file.
*/
private val index: List<Pair<Instant, Long>>

/**
* @see Feed.timeframe
*/
override val timeframe: Timeframe
private val index by lazy { createIndex() }

/**
* Get available assets.
*/
override val assets: SortedSet<Asset>
get() = assetLookup.values.toSortedSet()
override val timeframe: Timeframe by lazy { calcTimeframe() }


init {
assert(path.isRegularFile()) { "$path is not a file" }
val metadataProvider = MetadataProvider(path)

val metadata = metadataProvider.build(useCache)
this.index = metadata.index
timeframe = metadata.timeframe
assetLookup = metadata.assets

logger.info { "loaded feed with timeframe=$timeframe" }
logger.info { "New AvroFeed path=$path exist=${exists()}" }
}

private fun position(r: DataFileReader<GenericRecord>, time: Instant) {
val idx = index.binarySearch { it.first.compareTo(time) }
when {
idx > 0 -> r.seek(index[idx - 1].second)
idx < -1 -> r.seek(index[-idx - 2].second)
}
}

fun exists(): Boolean = Files.exists(path)

private fun getReader(): DataFileReader<GenericRecord> {
return DataFileReader(path.toFile(), GenericDatumReader())
}

/**
* Convert a generic Avro record to a [PriceItem]
*/
private fun recToPriceAction(rec: GenericRecord, serializer: PriceActionSerializer): PriceItem {
val assetStr = rec.get(1).toString()
val asset = assetLookup.getValue(assetStr)
val actionType = rec.get(2) as Int

@Suppress("UNCHECKED_CAST")
val values = rec.get(3) as List<Double>

return if (rec.hasField("other")) {
val other = rec.get("other") as Utf8?
serializer.deserialize(asset, actionType, values, other?.toString())
} else {
serializer.deserialize(asset, actionType, values, null)
}
private fun ofEpochNano(value: Long): Instant {
return if (value >= 0L)
Instant.ofEpochSecond(value / 1_000_000_000L, value % 1_000_000_000L)
else
Instant.ofEpochSecond(value / 1_000_000_000L, -value % 1_000_000_000L)
}

/**
Expand All @@ -128,116 +106,160 @@ class AvroFeed(private val path: Path, useCache: Boolean = false) : AssetFeed {
override suspend fun play(channel: EventChannel) {
val timeframe = channel.timeframe
var last = Instant.MIN
var actions = ArrayList<PriceItem>()
var items = ArrayList<PriceItem>()
val serializer = PriceActionSerializer()

val cache = mutableMapOf<String, Asset>()
getReader().use {
position(it, timeframe.start)

if (timeframe.isFinite()) position(it, timeframe.start)
while (it.hasNext()) {
val rec = it.next()

// Optimize unnecessary parsing of the whole record
val now = Instant.ofEpochMilli(rec[0] as Long)
val now = ofEpochNano(rec[0] as Long)
if (now < timeframe) continue

if (now != last) {
channel.sendNotEmpty(Event(last, actions))
channel.sendNotEmpty(Event(last, items))
last = now
actions = ArrayList<PriceItem>(actions.size)
items = ArrayList<PriceItem>(items.size)
}

if (now > timeframe) break

val action = recToPriceAction(rec, serializer)
actions.add(action)
// Parse the remaining attributes
val symbol = rec.get(1).toString()
val asset = cache.getOrPut(symbol) { template.copy(symbol = symbol) }
val priceItemType = PriceItemType.valueOf(rec.get(2).toString())

@Suppress("UNCHECKED_CAST")
val values = rec.get(3) as List<Double>
val other = rec.get("other") as Utf8?
val item = serializer.deserialize(asset, priceItemType, values, other?.toString())
items.add(item)
}
channel.sendNotEmpty(Event(last, items))
}
}

private fun position(r: DataFileReader<GenericRecord>, time: Instant) {
val key = index.floorKey(time)
if (key != null) r.seek(index.getValue(key))
}

private fun createIndex() : TreeMap<Instant, Long> {
val index = TreeMap<Instant, Long>()
getReader().use {
while (it.hasNext()) {
val position = it.tell()
val t = ofEpochNano(it.next().get(0) as Long)
it.seek(position)
if (it.hasNext()) {
index.putIfAbsent(t,position)
it.nextBlock()
}
}
}
return index
}

private fun calcTimeframe() : Timeframe {
if (index.isEmpty()) return Timeframe.EMPTY
val start = index.firstKey()
getReader().use {
position(it, index.lastKey())
var timestamp = index.lastKey().toEpochNano()
while (it.hasNext()) {
timestamp = it.next().get(0) as Long
}
channel.sendNotEmpty(Event(last, actions))
return Timeframe(start, ofEpochNano(timestamp), true)
}
}

private fun Instant.toEpochNano(): Long {
var currentTimeNano = epochSecond * 1_000_000_000L
currentTimeNano += if (currentTimeNano > 0) nano else -nano
return currentTimeNano
}

/**
* Standard set of Avro feeds that come with roboquant and will be downloaded the first time when invoked. They are
* stored at <User.Home>/.roboquant and reused from there later on.
* Record the price-actions in a [feed] and store them in an Avro file that can be later used as input for
* an AvroFeed. The provided [feed] needs to implement the [AssetFeed] interface.
*
* [compress] can be enabled, which results in a smaller file. The `snappy` compression codec is used, that
* achieves decent compression ratio while using limited CPU usage.
*
* Additionally, you can filter on a [timeframe]. Default is to apply no filtering.
*/
companion object {

internal val logger = Logging.getLogger(AvroFeed::class)
private const val SP500FILE = "sp500_pricebar_v6.0.avro"
private const val SP500QUOTEFILE = "sp500_pricequote_v5.0.avro"
private const val FOREXFILE = "forex_pricebar_v5.1.avro"

/**
* Get an AvroFeed containing end-of-day [PriceBar] data for the companies listed in the S&P 500. This feed
* contains a few years of public data.
*
* Please note that not all US exchanges are included, so the prices are not 100% accurate.
*/
fun sp500(): AvroFeed {
val path = download(SP500FILE)
return AvroFeed(path)
@Suppress("LongParameterList")
fun record(
feed: Feed,
compress: Boolean = true,
timeframe: Timeframe = Timeframe.INFINITE,
append: Boolean = false,
syncInterval: Int = DataFileConstants.DEFAULT_SYNC_INTERVAL
) = runBlocking {

val channel = EventChannel(timeframe = timeframe)
val schema = Schema.Parser().parse(SCHEMA)
val datumWriter: DatumWriter<GenericRecord> = GenericDatumWriter(schema)
val dataFileWriter = DataFileWriter(datumWriter)
val file = path.toFile()

if (append) {
require(exists()) {"File $file doesn't exist yet, cannot append"}
dataFileWriter.appendTo(file)
} else {
if (compress) dataFileWriter.setCodec(CodecFactory.snappyCodec())
dataFileWriter.setSyncInterval(syncInterval)
dataFileWriter.create(schema, file)
}

/**
* Get an AvroFeed containing [PriceQuote] data for the companies listed in the S&P 500. This feed contains
* a few minutes of public data.
*
* Please note that not all US exchanges are included, so the prices are not 100% accurate.
*/
fun sp500Quotes(): AvroFeed {
val path = download(SP500QUOTEFILE)
return AvroFeed(path)
val job = launch {
feed.play(channel)
channel.close()
}

/**
* Get an AvroFeed containing 1 minute [PriceBar] data for an EUR/USD currency pair.
*/
fun forex(): AvroFeed {
val path = download(FOREXFILE)
return AvroFeed(path)
}
val arraySchema = Schema.createArray(Schema.create(Schema.Type.DOUBLE))
val enumSchema = Schema.createArray(Schema.create(Schema.Type.STRING))
try {
val record = GenericData.Record(schema)
val serializer = PriceActionSerializer()

while (true) {
val event = channel.receive()
val now = event.time.toEpochNano()

for (action in event.items.filterIsInstance<PriceItem>()) {

val asset = action.asset
record.put(0, now)
record.put(1, asset.symbol)

/**
* Download a file from GitHub if now yet present on the local file system.
*/
private fun download(fileName: String): Path {
val path: Path = Paths.get(Config.home.toString(), fileName)
if (Files.notExists(path)) {
val url = "https://roboquant-public.s3.eu-west-1.amazonaws.com/avro/$fileName"
// val url = "https://github.com/neurallayer/roboquant-data/blob/main/avro/$fileName?raw=true"
logger.info("Downloading data from $url...")
val website = URL(url)
website.openStream().use { inputStream: InputStream ->
Files.copy(
inputStream, path, StandardCopyOption.REPLACE_EXISTING
)
val serialization = serializer.serialize(action)
val t = GenericData.EnumSymbol(enumSchema, serialization.type)
record.put(2, t)

val arr = GenericData.Array<Double>(serialization.values.size, arraySchema)
arr.addAll(serialization.values)
record.put(3, arr)

record.put(4, serialization.other)
dataFileWriter.append(record)
}
require(Files.exists(path))

}
return path
}

/**
* Record the price-actions in a [feed] and store them in an Avro [fileName] that can be later used as input for
* an AvroFeed. The provided [feed] needs to implement the [AssetFeed] interface.
*
* [compression] can be enabled, which results in a smaller file. The `snappy` compression codec is used, that
* achieves decent compression ratio while using limited CPU usage.
*
* Additionally, you can filter on a [timeframe] and [assetFilter]. Default is to apply no filtering.
*/
@Suppress("LongParameterList")
fun record(
feed: Feed,
fileName: String,
compression: Boolean = true,
timeframe: Timeframe = Timeframe.INFINITE,
append: Boolean = false,
assetFilter: AssetFilter = AssetFilter.all()
) {
recordAvro(feed, fileName, compression, timeframe, append, assetFilter)
} catch (_: ClosedReceiveChannelException) {
// On purpose left empty, expected exception
} finally {
channel.close()
if (job.isActive) job.cancel()
dataFileWriter.sync()
dataFileWriter.close()
}
}



}

Loading

0 comments on commit 769f1aa

Please sign in to comment.