Skip to content

Commit

Permalink
Merge pull request #15 from coinmetrics/6-cm-api-integration
Browse files Browse the repository at this point in the history
6 cm api integration
  • Loading branch information
mtrudeau-foundry-digital authored Dec 9, 2024
2 parents 63e284c + c0dd328 commit 0189c99
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 171 deletions.
207 changes: 98 additions & 109 deletions precog/miners/base_miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,117 +4,106 @@
import bittensor as bt
import pandas as pd

from precog.miners.miner import Miner
from precog.protocol import Challenge
from precog.utils.classes import Config
from precog.utils.cm_data import CMData
from precog.utils.general import parse_arguments
from precog.utils.timestamp import datetime_to_iso8601, iso8601_to_datetime
from precog.utils.timestamp import datetime_to_CM_timestamp, iso8601_to_datetime


class BaseMiner(Miner):
def get_point_estimate(timestamp: str) -> float:
"""Make a naive forecast by predicting the most recent price
def get_point_estimate(self, timestamp: str) -> float:
"""Make a naive forecast by predicting the most recent price
Args:
timestamp (str): The current timestamp provided by the validator request
Returns:
(float): The current BTC price tied to the provided timestamp
"""
# Create data gathering instance
cm = CMData()

# Set the time range to be as small as possible for query speed
# Set the start time as 2 seconds prior to the provided time
start_time: str = datetime_to_iso8601(iso8601_to_datetime(timestamp) - timedelta(seconds=2))
end_time: str = timestamp

# Query CM API for a pandas dataframe with only one record
price_data: pd.DataFrame = cm.get_CM_ReferenceRate(assets="BTC", start=start_time, end=end_time)

# Get current price closest to the provided timestamp
btc_price: float = float(price_data["ReferenceRateUSD"].iloc[-1])

# Return the current price of BTC as our point estimate
return btc_price

def get_prediction_interval(self, timestamp: str, point_estimate: float) -> Tuple[float, float]:
"""Make a naive multi-step prediction interval by estimating
the sample standard deviation
Args:
timestamp (str): The current timestamp provided by the validator request
point_estimate (float): The center of the prediction interval
Returns:
(float): The 90% naive prediction interval lower bound
(float): The 90% naive prediction interval upper bound
Notes:
Make reasonable assumptions that the 1s BTC price residuals are
uncorrelated and normally distributed
"""
# Create data gathering instance
cm = CMData()

# Set the time range to be 24 hours
start_time: str = datetime_to_iso8601(iso8601_to_datetime(timestamp) - timedelta(days=1))
end_time: str = timestamp

# Query CM API for sample standard deviation of the 1s residuals
historical_price_data: pd.DataFrame = cm.get_CM_ReferenceRate(
assets="BTC", start=start_time, end=end_time, frequency="1s"
)
residuals: pd.Series = historical_price_data["ReferenceRateUSD"].diff()
sample_std_dev: float = float(residuals.std())

# We have the standard deviation of the 1s residuals
# We are forecasting forward 5m, which is 300s
# We must scale the 1s sample standard deviation to reflect a 300s forecast
# Make reasonable assumptions that the 1s residuals are uncorrelated and normally distributed
# To do this naively, we multiply the std dev by the square root of the number of time steps
time_steps: int = 300
naive_forecast_std_dev: float = sample_std_dev * (time_steps**0.5)

# For a 90% prediction interval, we use the coefficient 1.64
# Make reasonable assumptions that the 1s residuals are uncorrelated and normally distributed
coefficient: float = 1.64

# Calculate the lower bound and upper bound
lower_bound: float = point_estimate - coefficient * naive_forecast_std_dev
upper_bound: float = point_estimate + coefficient * naive_forecast_std_dev

# Return the naive prediction interval for our forecast
return lower_bound, upper_bound

async def forward(self, synapse: Challenge) -> Challenge:
bt.logging.info(
f"👈 Received prediction request from: {synapse.dendrite.hotkey} for timestamp: {synapse.timestamp}"
)

# Get the naive point estimate
point_estimate: float = self.get_point_estimate(timestamp=synapse.timestamp)

# Get the naive prediction interval
prediction_interval: Tuple[float, float] = self.get_prediction_interval(
timestamp=synapse.timestamp, point_estimate=point_estimate
)

synapse.prediction = point_estimate
synapse.interval = prediction_interval

if synapse.prediction is not None:
bt.logging.success(f"Predicted price: {synapse.prediction} | Predicted Interval: {synapse.interval}")
else:
bt.logging.info("No prediction for this request.")
return synapse


# Run the miner
if __name__ == "__main__":
args = parse_arguments()
config = Config(args)
miner = BaseMiner(config=config)
miner.loop.run_forever()
Args:
timestamp (str): The current timestamp provided by the validator request
Returns:
(float): The current BTC price tied to the provided timestamp
"""
# Create data gathering instance
cm = CMData()

# Set the time range to be as small as possible for query speed
# Set the start time as 2 seconds prior to the provided time
start_time: str = datetime_to_CM_timestamp(iso8601_to_datetime(timestamp) - timedelta(days=1))
end_time: str = datetime_to_CM_timestamp(iso8601_to_datetime(timestamp)) # built-ins handle CM API's formatting

# Query CM API for a pandas dataframe with only one record
price_data: pd.DataFrame = cm.get_CM_ReferenceRate(assets="BTC", start=start_time, end=end_time)

# Get current price closest to the provided timestamp
btc_price: float = float(price_data["ReferenceRateUSD"].iloc[-1])

# Return the current price of BTC as our point estimate
return btc_price


def get_prediction_interval(timestamp: str, point_estimate: float) -> Tuple[float, float]:
"""Make a naive multi-step prediction interval by estimating
the sample standard deviation
Args:
timestamp (str): The current timestamp provided by the validator request
point_estimate (float): The center of the prediction interval
Returns:
(float): The 90% naive prediction interval lower bound
(float): The 90% naive prediction interval upper bound
Notes:
Make reasonable assumptions that the 1s BTC price residuals are
uncorrelated and normally distributed
"""
# Create data gathering instance
cm = CMData()

# Set the time range to be 24 hours
start_time: str = datetime_to_CM_timestamp(iso8601_to_datetime(timestamp) - timedelta(days=1))
end_time: str = datetime_to_CM_timestamp(iso8601_to_datetime(timestamp)) # built-ins handle CM API's formatting

# Query CM API for sample standard deviation of the 1s residuals
historical_price_data: pd.DataFrame = cm.get_CM_ReferenceRate(
assets="BTC", start=start_time, end=end_time, frequency="1s"
)
residuals: pd.Series = historical_price_data["ReferenceRateUSD"].diff()
sample_std_dev: float = float(residuals.std())

# We have the standard deviation of the 1s residuals
# We are forecasting forward 5m, which is 300s
# We must scale the 1s sample standard deviation to reflect a 300s forecast
# Make reasonable assumptions that the 1s residuals are uncorrelated and normally distributed
# To do this naively, we multiply the std dev by the square root of the number of time steps
time_steps: int = 300
naive_forecast_std_dev: float = sample_std_dev * (time_steps**0.5)

# For a 90% prediction interval, we use the coefficient 1.64
# Make reasonable assumptions that the 1s residuals are uncorrelated and normally distributed
coefficient: float = 1.64

# Calculate the lower bound and upper bound
lower_bound: float = point_estimate - coefficient * naive_forecast_std_dev
upper_bound: float = point_estimate + coefficient * naive_forecast_std_dev

# Return the naive prediction interval for our forecast
return lower_bound, upper_bound


def forward(synapse: Challenge) -> Challenge:
bt.logging.info(
f"👈 Received prediction request from: {synapse.dendrite.hotkey} for timestamp: {synapse.timestamp}"
)

# Get the naive point estimate
point_estimate: float = get_point_estimate(timestamp=synapse.timestamp)

# Get the naive prediction interval
prediction_interval: Tuple[float, float] = get_prediction_interval(
timestamp=synapse.timestamp, point_estimate=point_estimate
)

synapse.prediction = point_estimate
synapse.interval = prediction_interval

if synapse.prediction is not None:
bt.logging.success(f"Predicted price: {synapse.prediction} | Predicted Interval: {synapse.interval}")
else:
bt.logging.info("No prediction for this request.")
return synapse
30 changes: 0 additions & 30 deletions precog/miners/forward_bad.py

This file was deleted.

2 changes: 1 addition & 1 deletion precog/miners/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Miner:
def __init__(self, config=None):
args = parse_arguments()
config = Config(args)
self.forward_module = importlib.import_module(f"precog.miners.{config.forward_function}", package="forward")
self.forward_module = importlib.import_module(f"precog.miners.{config.forward_function}")
self.config = config
self.config.neuron.type = "Miner"
setup_bittensor_objects(self)
Expand Down
2 changes: 1 addition & 1 deletion precog/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Challenge(bt.Synapse):
)

# Optional request output, filled by recieving axon.
prediction: Optional[List[float]] = pydantic.Field(
prediction: Optional[float] = pydantic.Field(
default=None,
title="Predictions",
description="The predictions to send to the dendrite caller",
Expand Down
8 changes: 8 additions & 0 deletions precog/utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import git
import requests
from numpy import argsort, array, concatenate, cumsum, empty_like
from pandas import DataFrame

from precog.utils.classes import NestedNamespace

Expand Down Expand Up @@ -138,3 +139,10 @@ def func_with_retry(func: Callable, max_attempts: int = 3, delay: float = 1, *ar
raise
else:
time.sleep(delay)


def pd_to_dict(data: DataFrame) -> dict:
price_dict = {}
for i in range(len(data)):
price_dict[data.time[i].to_pydatetime()] = data.iloc[i]["ReferenceRateUSD"].item()
return price_dict
31 changes: 14 additions & 17 deletions precog/utils/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def datetime_to_iso8601(timestamp: datetime) -> str:
"""
Convert datetime to iso 8601 string
"""
return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
return timestamp.isoformat()


def iso8601_to_datetime(timestamp: str) -> datetime:
Expand All @@ -83,6 +83,13 @@ def posix_to_datetime(timestamp: float) -> datetime:
return datetime.fromtimestamp(timestamp, tz=get_timezone())


def datetime_to_CM_timestamp(timestamp: datetime) -> str:
"""
Convert iso 8601 string to coinmetrics timestamp
"""
return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ")


###############################
# FUNCTIONS #
###############################
Expand Down Expand Up @@ -150,37 +157,27 @@ def is_query_time(prediction_interval: int, timestamp: str, tolerance: int = 120
return beginning_of_epoch


def align_timepoints(filtered_pred_dict, cm_data, cm_timestamps):
def align_timepoints(filtered_pred_dict, cm_dict):
"""Takes in a dictionary of predictions and aligns them to a list of coinmetrics prices.
Args:
filtered_pred_dict (dict): {datetime: float} dictionary of predictions.
cm_data (List[float]): price data from coinmetrics corresponding to the datetimes in cm_timestamps.
cm_timestamps (List[datetime]): timestamps corresponding to the values in cm_data.
cm_data (dict): {datetime: float} dictionary of prices.
Returns:
aligned_pred_values (List[float]): The values in filtered_pred_dict with timestamp keys that match cm_timestamps.
aligned_cm_data (List[float]): The values in cm_data where cm_timestamps matches the timestamps in filtered_pred_dict.
aligned_timestamps (List[datetime]): The timestamps corresponding to the values in aligned_pred_values and aligned_cm_data.
"""
if len(cm_data) != len(cm_timestamps):
raise ValueError("cm_data and cm_timepoints must be of the same length.")

aligned_pred_values = []
aligned_cm_data = []
aligned_timestamps = []

# Convert cm_timepoints to a set for faster lookup
cm_timestamps_set = set(cm_timestamps)
# Loop through filtered_pred_dict to find matching datetime keys
for timestamp, pred_value in filtered_pred_dict.items():
if timestamp in cm_timestamps_set:
# Find the index in cm_timepoints to get corresponding cm_data
index = cm_timestamps.index(timestamp)
aligned_pred_values.append(pred_value)
for timestamp, value in filtered_pred_dict.items():
if timestamp in cm_dict:
aligned_pred_values.append(value)
aligned_timestamps.append(timestamp)
aligned_cm_data.append(cm_data[index])
aligned_cm_data.append(cm_dict[timestamp])
return aligned_pred_values, aligned_cm_data, aligned_timestamps


Expand Down
25 changes: 15 additions & 10 deletions precog/validators/reward.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

import bittensor as bt
import numpy as np
from pandas import DataFrame

from precog.protocol import Challenge
from precog.utils.general import rank
from precog.utils.timestamp import align_timepoints, get_now, mature_dictionary, round_minute_down
from precog.utils.cm_data import CMData
from precog.utils.general import pd_to_dict, rank
from precog.utils.timestamp import align_timepoints, datetime_to_CM_timestamp, iso8601_to_datetime, mature_dictionary


################################################################################
Expand All @@ -20,21 +22,24 @@ def calc_rewards(
decay = 0.9
weights = np.linspace(0, len(self.available_uids) - 1, len(self.available_uids))
decayed_weights = decay**weights
# cm_prices, cm_timestamps = get_cm_prices() # fake placeholder to get the past hours prices
cm_prices = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
cm_timestamps = [
round_minute_down(get_now()) - timedelta(minutes=(i + 1) * 5) for i in range(12)
] # placeholder to align cm price timepoints to the timestamps in history
cm_timestamps.reverse()
timestamp = responses[0].timestamp
cm = CMData()
start_time: str = datetime_to_CM_timestamp(iso8601_to_datetime(timestamp) - timedelta(hours=1))
end_time: str = datetime_to_CM_timestamp(iso8601_to_datetime(timestamp)) # built-ins handle CM API's formatting
# Query CM API for sample standard deviation of the 1s residuals
historical_price_data: DataFrame = cm.get_CM_ReferenceRate(
assets="BTC", start=start_time, end=end_time, frequency="1s"
)
cm_data = pd_to_dict(historical_price_data)
for uid, response in zip(self.available_uids, responses):
current_miner = self.MinerHistory[uid]
self.MinerHistory[uid].add_prediction(response.timestamp, response.prediction, response.interval)
prediction_dict, interval_dict = current_miner.format_predictions(response.timestamp)
mature_time_dict = mature_dictionary(prediction_dict)
preds, price, aligned_pred_timestamps = align_timepoints(mature_time_dict, cm_prices, cm_timestamps)
preds, price, aligned_pred_timestamps = align_timepoints(mature_time_dict, cm_data)
for i, j, k in zip(preds, price, aligned_pred_timestamps):
bt.logging.debug(f"Prediction: {i} | Price: {j} | Aligned Prediction: {k}")
inters, interval_prices, aligned_int_timestamps = align_timepoints(interval_dict, cm_prices, cm_timestamps)
inters, interval_prices, aligned_int_timestamps = align_timepoints(interval_dict, cm_data)
for i, j, k in zip(inters, interval_prices, aligned_int_timestamps):
bt.logging.debug(f"Interval: {i} | Interval Price: {j} | Aligned TS: {k}")
point_errors.append(point_error(preds, price))
Expand Down
Loading

0 comments on commit 0189c99

Please sign in to comment.