Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6 cm api integration #15

Merged
merged 5 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 98 additions & 109 deletions precog/miners/base_miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,117 +4,106 @@
import bittensor as bt
import pandas as pd

from precog.miners.miner import Miner
from precog.protocol import Challenge
from precog.utils.classes import Config
from precog.utils.cm_data import CMData
from precog.utils.general import parse_arguments
from precog.utils.timestamp import datetime_to_iso8601, iso8601_to_datetime
from precog.utils.timestamp import datetime_to_CM_timestamp, iso8601_to_datetime


class BaseMiner(Miner):
def get_point_estimate(timestamp: str) -> float:
"""Make a naive forecast by predicting the most recent price

def get_point_estimate(self, timestamp: str) -> float:
"""Make a naive forecast by predicting the most recent price

Args:
timestamp (str): The current timestamp provided by the validator request

Returns:
(float): The current BTC price tied to the provided timestamp
"""
# Create data gathering instance
cm = CMData()

# Set the time range to be as small as possible for query speed
# Set the start time as 2 seconds prior to the provided time
start_time: str = datetime_to_iso8601(iso8601_to_datetime(timestamp) - timedelta(seconds=2))
end_time: str = timestamp

# Query CM API for a pandas dataframe with only one record
price_data: pd.DataFrame = cm.get_CM_ReferenceRate(assets="BTC", start=start_time, end=end_time)

# Get current price closest to the provided timestamp
btc_price: float = float(price_data["ReferenceRateUSD"].iloc[-1])

# Return the current price of BTC as our point estimate
return btc_price

def get_prediction_interval(self, timestamp: str, point_estimate: float) -> Tuple[float, float]:
"""Make a naive multi-step prediction interval by estimating
the sample standard deviation

Args:
timestamp (str): The current timestamp provided by the validator request
point_estimate (float): The center of the prediction interval

Returns:
(float): The 90% naive prediction interval lower bound
(float): The 90% naive prediction interval upper bound

Notes:
Make reasonable assumptions that the 1s BTC price residuals are
uncorrelated and normally distributed
"""
# Create data gathering instance
cm = CMData()

# Set the time range to be 24 hours
start_time: str = datetime_to_iso8601(iso8601_to_datetime(timestamp) - timedelta(days=1))
end_time: str = timestamp

# Query CM API for sample standard deviation of the 1s residuals
historical_price_data: pd.DataFrame = cm.get_CM_ReferenceRate(
assets="BTC", start=start_time, end=end_time, frequency="1s"
)
residuals: pd.Series = historical_price_data["ReferenceRateUSD"].diff()
sample_std_dev: float = float(residuals.std())

# We have the standard deviation of the 1s residuals
# We are forecasting forward 5m, which is 300s
# We must scale the 1s sample standard deviation to reflect a 300s forecast
# Make reasonable assumptions that the 1s residuals are uncorrelated and normally distributed
# To do this naively, we multiply the std dev by the square root of the number of time steps
time_steps: int = 300
naive_forecast_std_dev: float = sample_std_dev * (time_steps**0.5)

# For a 90% prediction interval, we use the coefficient 1.64
# Make reasonable assumptions that the 1s residuals are uncorrelated and normally distributed
coefficient: float = 1.64

# Calculate the lower bound and upper bound
lower_bound: float = point_estimate - coefficient * naive_forecast_std_dev
upper_bound: float = point_estimate + coefficient * naive_forecast_std_dev

# Return the naive prediction interval for our forecast
return lower_bound, upper_bound

async def forward(self, synapse: Challenge) -> Challenge:
bt.logging.info(
f"👈 Received prediction request from: {synapse.dendrite.hotkey} for timestamp: {synapse.timestamp}"
)

# Get the naive point estimate
point_estimate: float = self.get_point_estimate(timestamp=synapse.timestamp)

# Get the naive prediction interval
prediction_interval: Tuple[float, float] = self.get_prediction_interval(
timestamp=synapse.timestamp, point_estimate=point_estimate
)

synapse.prediction = point_estimate
synapse.interval = prediction_interval

if synapse.prediction is not None:
bt.logging.success(f"Predicted price: {synapse.prediction} | Predicted Interval: {synapse.interval}")
else:
bt.logging.info("No prediction for this request.")
return synapse


# Run the miner
if __name__ == "__main__":
args = parse_arguments()
config = Config(args)
miner = BaseMiner(config=config)
miner.loop.run_forever()
Args:
timestamp (str): The current timestamp provided by the validator request

Returns:
(float): The current BTC price tied to the provided timestamp
"""
# Create data gathering instance
cm = CMData()

# Set the time range to be as small as possible for query speed
# Set the start time as 2 seconds prior to the provided time
start_time: str = datetime_to_CM_timestamp(iso8601_to_datetime(timestamp) - timedelta(days=1))
end_time: str = datetime_to_CM_timestamp(iso8601_to_datetime(timestamp)) # built-ins handle CM API's formatting

# Query CM API for a pandas dataframe with only one record
price_data: pd.DataFrame = cm.get_CM_ReferenceRate(assets="BTC", start=start_time, end=end_time)

# Get current price closest to the provided timestamp
btc_price: float = float(price_data["ReferenceRateUSD"].iloc[-1])

# Return the current price of BTC as our point estimate
return btc_price


def get_prediction_interval(timestamp: str, point_estimate: float) -> Tuple[float, float]:
"""Make a naive multi-step prediction interval by estimating
the sample standard deviation

Args:
timestamp (str): The current timestamp provided by the validator request
point_estimate (float): The center of the prediction interval

Returns:
(float): The 90% naive prediction interval lower bound
(float): The 90% naive prediction interval upper bound

Notes:
Make reasonable assumptions that the 1s BTC price residuals are
uncorrelated and normally distributed
"""
# Create data gathering instance
cm = CMData()

# Set the time range to be 24 hours
start_time: str = datetime_to_CM_timestamp(iso8601_to_datetime(timestamp) - timedelta(days=1))
end_time: str = datetime_to_CM_timestamp(iso8601_to_datetime(timestamp)) # built-ins handle CM API's formatting

# Query CM API for sample standard deviation of the 1s residuals
historical_price_data: pd.DataFrame = cm.get_CM_ReferenceRate(
assets="BTC", start=start_time, end=end_time, frequency="1s"
)
residuals: pd.Series = historical_price_data["ReferenceRateUSD"].diff()
sample_std_dev: float = float(residuals.std())

# We have the standard deviation of the 1s residuals
# We are forecasting forward 5m, which is 300s
# We must scale the 1s sample standard deviation to reflect a 300s forecast
# Make reasonable assumptions that the 1s residuals are uncorrelated and normally distributed
# To do this naively, we multiply the std dev by the square root of the number of time steps
time_steps: int = 300
naive_forecast_std_dev: float = sample_std_dev * (time_steps**0.5)

# For a 90% prediction interval, we use the coefficient 1.64
# Make reasonable assumptions that the 1s residuals are uncorrelated and normally distributed
coefficient: float = 1.64

# Calculate the lower bound and upper bound
lower_bound: float = point_estimate - coefficient * naive_forecast_std_dev
upper_bound: float = point_estimate + coefficient * naive_forecast_std_dev

# Return the naive prediction interval for our forecast
return lower_bound, upper_bound


def forward(synapse: Challenge) -> Challenge:
bt.logging.info(
f"👈 Received prediction request from: {synapse.dendrite.hotkey} for timestamp: {synapse.timestamp}"
)

# Get the naive point estimate
point_estimate: float = get_point_estimate(timestamp=synapse.timestamp)

# Get the naive prediction interval
prediction_interval: Tuple[float, float] = get_prediction_interval(
timestamp=synapse.timestamp, point_estimate=point_estimate
)

synapse.prediction = point_estimate
synapse.interval = prediction_interval

if synapse.prediction is not None:
bt.logging.success(f"Predicted price: {synapse.prediction} | Predicted Interval: {synapse.interval}")
else:
bt.logging.info("No prediction for this request.")
return synapse
30 changes: 0 additions & 30 deletions precog/miners/forward_bad.py

This file was deleted.

2 changes: 1 addition & 1 deletion precog/miners/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Miner:
def __init__(self, config=None):
args = parse_arguments()
config = Config(args)
self.forward_module = importlib.import_module(f"precog.miners.{config.forward_function}", package="forward")
self.forward_module = importlib.import_module(f"precog.miners.{config.forward_function}")
self.config = config
self.config.neuron.type = "Miner"
setup_bittensor_objects(self)
Expand Down
2 changes: 1 addition & 1 deletion precog/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Challenge(bt.Synapse):
)

# Optional request output, filled by recieving axon.
prediction: Optional[List[float]] = pydantic.Field(
prediction: Optional[float] = pydantic.Field(
default=None,
title="Predictions",
description="The predictions to send to the dendrite caller",
Expand Down
8 changes: 8 additions & 0 deletions precog/utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import git
import requests
from numpy import argsort, array, concatenate, cumsum, empty_like
from pandas import DataFrame

from precog.utils.classes import NestedNamespace

Expand Down Expand Up @@ -138,3 +139,10 @@ def func_with_retry(func: Callable, max_attempts: int = 3, delay: float = 1, *ar
raise
else:
time.sleep(delay)


def pd_to_dict(data: DataFrame) -> dict:
price_dict = {}
for i in range(len(data)):
price_dict[data.time[i].to_pydatetime()] = data.iloc[i]["ReferenceRateUSD"].item()
return price_dict
31 changes: 14 additions & 17 deletions precog/utils/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def datetime_to_iso8601(timestamp: datetime) -> str:
"""
Convert datetime to iso 8601 string
"""
return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
return timestamp.isoformat()


def iso8601_to_datetime(timestamp: str) -> datetime:
Expand All @@ -83,6 +83,13 @@ def posix_to_datetime(timestamp: float) -> datetime:
return datetime.fromtimestamp(timestamp, tz=get_timezone())


def datetime_to_CM_timestamp(timestamp: datetime) -> str:
"""
Convert iso 8601 string to coinmetrics timestamp
"""
return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ")


###############################
# FUNCTIONS #
###############################
Expand Down Expand Up @@ -150,37 +157,27 @@ def is_query_time(prediction_interval: int, timestamp: str, tolerance: int = 120
return beginning_of_epoch


def align_timepoints(filtered_pred_dict, cm_data, cm_timestamps):
def align_timepoints(filtered_pred_dict, cm_dict):
"""Takes in a dictionary of predictions and aligns them to a list of coinmetrics prices.

Args:
filtered_pred_dict (dict): {datetime: float} dictionary of predictions.
cm_data (List[float]): price data from coinmetrics corresponding to the datetimes in cm_timestamps.
cm_timestamps (List[datetime]): timestamps corresponding to the values in cm_data.
cm_data (dict): {datetime: float} dictionary of prices.


Returns:
aligned_pred_values (List[float]): The values in filtered_pred_dict with timestamp keys that match cm_timestamps.
aligned_cm_data (List[float]): The values in cm_data where cm_timestamps matches the timestamps in filtered_pred_dict.
aligned_timestamps (List[datetime]): The timestamps corresponding to the values in aligned_pred_values and aligned_cm_data.
"""
if len(cm_data) != len(cm_timestamps):
raise ValueError("cm_data and cm_timepoints must be of the same length.")

aligned_pred_values = []
aligned_cm_data = []
aligned_timestamps = []

# Convert cm_timepoints to a set for faster lookup
cm_timestamps_set = set(cm_timestamps)
# Loop through filtered_pred_dict to find matching datetime keys
for timestamp, pred_value in filtered_pred_dict.items():
if timestamp in cm_timestamps_set:
# Find the index in cm_timepoints to get corresponding cm_data
index = cm_timestamps.index(timestamp)
aligned_pred_values.append(pred_value)
for timestamp, value in filtered_pred_dict.items():
if timestamp in cm_dict:
aligned_pred_values.append(value)
aligned_timestamps.append(timestamp)
aligned_cm_data.append(cm_data[index])
aligned_cm_data.append(cm_dict[timestamp])
return aligned_pred_values, aligned_cm_data, aligned_timestamps


Expand Down
25 changes: 15 additions & 10 deletions precog/validators/reward.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

import bittensor as bt
import numpy as np
from pandas import DataFrame

from precog.protocol import Challenge
from precog.utils.general import rank
from precog.utils.timestamp import align_timepoints, get_now, mature_dictionary, round_minute_down
from precog.utils.cm_data import CMData
from precog.utils.general import pd_to_dict, rank
from precog.utils.timestamp import align_timepoints, datetime_to_CM_timestamp, iso8601_to_datetime, mature_dictionary


################################################################################
Expand All @@ -20,21 +22,24 @@ def calc_rewards(
decay = 0.9
weights = np.linspace(0, len(self.available_uids) - 1, len(self.available_uids))
decayed_weights = decay**weights
# cm_prices, cm_timestamps = get_cm_prices() # fake placeholder to get the past hours prices
cm_prices = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
cm_timestamps = [
round_minute_down(get_now()) - timedelta(minutes=(i + 1) * 5) for i in range(12)
] # placeholder to align cm price timepoints to the timestamps in history
cm_timestamps.reverse()
timestamp = responses[0].timestamp
cm = CMData()
start_time: str = datetime_to_CM_timestamp(iso8601_to_datetime(timestamp) - timedelta(hours=1))
end_time: str = datetime_to_CM_timestamp(iso8601_to_datetime(timestamp)) # built-ins handle CM API's formatting
# Query CM API for sample standard deviation of the 1s residuals
historical_price_data: DataFrame = cm.get_CM_ReferenceRate(
assets="BTC", start=start_time, end=end_time, frequency="1s"
)
cm_data = pd_to_dict(historical_price_data)
for uid, response in zip(self.available_uids, responses):
current_miner = self.MinerHistory[uid]
self.MinerHistory[uid].add_prediction(response.timestamp, response.prediction, response.interval)
prediction_dict, interval_dict = current_miner.format_predictions(response.timestamp)
mature_time_dict = mature_dictionary(prediction_dict)
preds, price, aligned_pred_timestamps = align_timepoints(mature_time_dict, cm_prices, cm_timestamps)
preds, price, aligned_pred_timestamps = align_timepoints(mature_time_dict, cm_data)
for i, j, k in zip(preds, price, aligned_pred_timestamps):
bt.logging.debug(f"Prediction: {i} | Price: {j} | Aligned Prediction: {k}")
inters, interval_prices, aligned_int_timestamps = align_timepoints(interval_dict, cm_prices, cm_timestamps)
inters, interval_prices, aligned_int_timestamps = align_timepoints(interval_dict, cm_data)
for i, j, k in zip(inters, interval_prices, aligned_int_timestamps):
bt.logging.debug(f"Interval: {i} | Interval Price: {j} | Aligned TS: {k}")
point_errors.append(point_error(preds, price))
Expand Down
Loading
Loading