Skip to content

Commit

Permalink
PLAT-1123 PLAT-1124 Add more parameters to parallelization and height…
Browse files Browse the repository at this point in the history
…_increments

This PR:
* adds more parameters to the .parallel() call (PLAT-1123)
* allows parallelization to be broken down by integer increments. (PLAT-1124)
  • Loading branch information
victoreram committed Aug 14, 2024
1 parent da7fbf3 commit 82fa6e0
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 149 deletions.
180 changes: 134 additions & 46 deletions coinmetrics/_data_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@

import requests
import itertools

from dateutil.relativedelta import relativedelta
from copy import deepcopy
from gzip import GzipFile
from io import BytesIO
from logging import getLogger
from time import sleep
from datetime import datetime, timedelta, date, timezone
from typing import Any, Dict, Iterable, Iterator, List, Optional, cast, Type, Callable, Union, Generator, Tuple
from typing import Any, Dict, Iterable, Iterator, List, Optional, cast, Type, Callable, Union, Generator, Tuple, TYPE_CHECKING
from dateutil.parser import isoparse
from coinmetrics._typing import (
DataRetrievalFuncType,
Expand All @@ -28,6 +27,9 @@
from concurrent.futures import ThreadPoolExecutor, Executor
from tqdm import tqdm
from collections import defaultdict
from coinmetrics._exceptions import CoinMetricsClientNotFoundError
if TYPE_CHECKING:
from coinmetrics.api_client import CoinMetricsClient

orjson_found = True
try:
Expand Down Expand Up @@ -74,7 +76,8 @@ def __init__(
endpoint: str,
url_params: Dict[str, UrlParamTypes],
csv_export_supported: bool = True,
columns_to_store: List[str] = []
columns_to_store: List[str] = [],
client: Optional[CoinMetricsClient] = None
) -> None:
self._csv_export_supported = csv_export_supported
self._data_retrieval_function = data_retrieval_function
Expand All @@ -83,6 +86,7 @@ def __init__(
self._next_page_token: Optional[str] = ""
self._current_data_iterator: Optional[Iterator[Any]] = None
self._columns_to_store = columns_to_store
self._client = client

def first_page(self) -> List[Dict[str, Any]]:
return cast(
Expand Down Expand Up @@ -298,7 +302,8 @@ def parallel(self,
executor: Optional[Callable[[Any], Executor]] = None,
max_workers: Optional[int] = None,
progress_bar: Optional[bool] = None,
time_increment: Optional[Union[relativedelta, timedelta]] = None
time_increment: Optional[Union[relativedelta, timedelta]] = None,
height_increment: Optional[int] = None
) -> "ParallelDataCollection":
"""
This method will convert the DataCollection into a ParallelDataCollection - enabling the ability to split
Expand All @@ -322,7 +327,9 @@ def parallel(self,
executor=executor,
max_workers=max_workers,
progress_bar=progress_bar,
time_increment=time_increment)
time_increment=time_increment,
height_increment=height_increment
)


class AssetChainsDataCollection(DataCollection):
Expand Down Expand Up @@ -360,8 +367,11 @@ class ParallelDataCollection(DataCollection):
"""

TIME = "time"
_VALID_PARALLELIZATION_PARAMS = {'exchanges', 'assets', 'indexes', 'metrics', 'markets', 'institutions',
'defi_protocols', 'exchange_assets', 'pairs'}
_VALID_PARALLELIZATION_PARAMS = {
'exchanges', 'assets', 'indexes', 'metrics', 'markets', 'institutions',
'defi_protocols', 'exchange_assets', 'pairs', 'txid', 'accounts',
'block_hashes', 'heights', 'sub_accounts'
}
_ENDPOINT_FIRST_PARAM_DICT = {
'blockchain-metadata/tags': 'type',
'blockchain-v2/{asset}/accounts': 'asset',
Expand Down Expand Up @@ -435,12 +445,16 @@ class ParallelDataCollection(DataCollection):
'reference-data/pair-metrics': 'metrics',
'reference-data/institution-metrics': 'metrics'}

def __init__(self, parent_data_collection: DataCollection,
parallelize_on: Optional[Union[str, List[str]]] = None,
executor: Optional[Callable[..., Executor]] = None,
max_workers: Optional[int] = None,
progress_bar: Optional[bool] = None,
time_increment: Optional[Union[relativedelta, timedelta]] = None):
def __init__(
self,
parent_data_collection: DataCollection,
parallelize_on: Optional[Union[str, List[str]]] = None,
executor: Optional[Callable[..., Executor]] = None,
max_workers: Optional[int] = None,
progress_bar: Optional[bool] = None,
time_increment: Optional[Union[relativedelta, timedelta]] = None,
height_increment: Optional[int] = None
):
"""
:param parallelize_on: What parameter to parallelize on. By default will use the primary query parameter in the
endpoint the user is calling. For example - if the user is calling `.get_market_candles(assets="...") it will
Expand All @@ -455,9 +469,12 @@ def __init__(self, parent_data_collection: DataCollection,
:param time_increment: Optionally, can split the data collections by time_increment. This feature splits
data collections further by time increment. So if you split by MONTH this will split a year long request into
12 smaller requests. If there is no "start_time" in the request it will raise a ValueError
:param height_increment: Optionally, can split the data collections by height_increment. This feature splits
data collections further by block height increment. If there is no "start_height" in the request it will raise a ValueError
"""
super().__init__(parent_data_collection._data_retrieval_function, parent_data_collection._endpoint,
parent_data_collection._url_params, parent_data_collection._csv_export_supported)
parent_data_collection._url_params, parent_data_collection._csv_export_supported,
client=parent_data_collection._client)
self._parallelize_on = self._get_parallelize_on(parallelize_on)
self._executor: Callable[..., Executor] = executor if executor else ThreadPoolExecutor # type: ignore
self._max_workers = max_workers if max_workers else 10
Expand All @@ -466,7 +483,11 @@ def __init__(self, parent_data_collection: DataCollection,
self._max_workers = 10
self._progress_bar = progress_bar if progress_bar is not None else True
self._time_increment = time_increment
if self._time_increment is not None:
self._height_increment = height_increment
if self._time_increment is not None and self._height_increment is not None:
raise ValueError("time_increment and height_increment are mutually exclusive")

elif (self._time_increment is not None) or (self._height_increment is not None):
self._url_params.update({"end_inclusive": False})

def get_parallel_datacollections(self) -> List[DataCollection]:
Expand Down Expand Up @@ -509,7 +530,6 @@ def get_parallel_datacollections(self) -> List[DataCollection]:
keys = list(query_items_dict.keys())
for values_combo in itertools.product(*query_items_dict.values()):
combinations.append(dict(zip(keys, values_combo)))

for combo in combinations:
new_params = self._url_params.copy()
new_params.update(combo)
Expand All @@ -522,46 +542,114 @@ def get_parallel_datacollections(self) -> List[DataCollection]:
data_collections = self._add_time_dimension_to_data_collections(data_collections=data_collections)
return data_collections

def _add_time_dimension_to_data_collections(self, data_collections: List[DataCollection]) -> List[DataCollection]:
def _add_time_dimension_to_data_collections(
self,
data_collections: List[DataCollection]
) -> List[DataCollection]:
"""
Helper function to help create all possible combinations of time + parallelized parameters. Takes a list of
of data collections and returns a larger a list of dataframe over the time range.
Helper function to help create all possible combinations of time or height + parallelized parameters. Takes a list of
of data collections and returns a larger a list of dataframe over the time or height range.
:param data_collections: List[DataCollection] list of data collections to be expanded
:return: List[DataCollections] All combinations of the original data collections, over the specified time_increment
"""
def generate_date_ranges(start: datetime, end: datetime, increment: Union[timedelta, relativedelta]) -> Generator[Tuple[datetime, datetime], None, None]:
def generate_ranges(
start: Union[datetime, int],
end: Union[datetime, int],
increment: Union[timedelta, relativedelta, int]
) -> Generator[Tuple[datetime | int, datetime | Any | int], None, None]:
# code below can be simplified but is expanded for mypy checks
current = start
if isinstance(increment, timedelta):
while current < end:
next_date = current + increment
if next_date > end:
next_date = end
yield (current, next_date)
current = next_date
elif isinstance(increment, relativedelta):
while current < end:
next_date = current + increment
if next_date > end:
next_date = end
yield (current, next_date)
current = next_date
if (
isinstance(start, datetime)
and isinstance(end, datetime)
and isinstance(increment, (timedelta, relativedelta))
):
if isinstance(end, datetime) and isinstance(current, datetime):
while current < end:
if isinstance(current, datetime) and isinstance(increment, (timedelta, relativedelta)):
next_ = current + increment
if next_ > end:
next_ = end
yield (current, next_)
current = next_
elif (
isinstance(start, int)
and isinstance(end, int)
and isinstance(increment, int)
):
if isinstance(current, int) and isinstance(end, int):
while current < end: # type: ignore
if isinstance(current, int) and isinstance(increment, int):
next_ = current + increment # type: ignore
if next_ > end: # type: ignore
next_ = end # type: ignore
yield (current, next_)
current = next_
else:
raise ValueError("Unsupported increment type")
raise ValueError("Unsupported combination of types for start, end, or increment")

if not self._time_increment:
if not self._time_increment and not self._height_increment:
return data_collections
if not self._url_params.get("start_time"):
raise ValueError("No start_time specified, cannot use time_increment feature")

start_time = self.parse_date(cast(datetime, self._url_params.get("start_time")))
end_time: datetime = self.parse_date(cast(datetime, self._url_params.get("end_time"))) if self._url_params.get(
"end_time") else datetime.today()
full_data_collections = []
for start, end in generate_date_ranges(start_time, end_time, increment=self._time_increment):
for data_collection in data_collections:
new_data_collection = deepcopy(data_collection)
new_data_collection._url_params.update({"start_time": start, "end_time": end})
full_data_collections.append(new_data_collection)
if self._height_increment and isinstance(self._height_increment, int):
if self._url_params.get("start_height") and isinstance(self._url_params.get("start_height"), (int, str)):
start_height = int(self._url_params.get("start_height")) # type: ignore
else:
start_height = 0
if self._url_params.get("end_height") and isinstance(self._url_params.get("end_height"), (int, str)):
end_height = int(self._url_params.get("end_height")) # type: ignore
else:
if self._url_params.get("asset"):
asset = str(self._url_params.get("asset"))
else:
raise ValueError(
"""
Parameter "asset" not found in request.
Note: Parallel height increment only works on a single asset.
Consider breaking query into asset-by-asset chunks (e.g. .parallel('assets').parallel(height_increment=height_increment))
"""
)
if self._client is not None:
block_data = self._client.get_list_of_blocks_v2(asset=asset, paging_from='end', page_size=1).first_page()
if block_data:
end_height = int(block_data[0]['height'])
else:
raise Exception(f"End height for asset {asset} not found.")
else:
raise CoinMetricsClientNotFoundError

for start, end in generate_ranges(
start_height,
end_height,
increment=self._height_increment
):
for data_collection in data_collections:
new_data_collection = deepcopy(data_collection)
new_data_collection._url_params.update(
{"start_height": start, "end_height": end}
)
full_data_collections.append(new_data_collection)
elif self._time_increment and isinstance(self._time_increment, (timedelta, relativedelta)):
if not self._url_params.get("start_time"):
raise ValueError("No start_time specified, cannot use time_increment feature")
else:
start_time = self.parse_date(
cast(datetime, self._url_params.get("start_time"))
)
end_time = self.parse_date(
cast(datetime, self._url_params.get("end_time"))
) if self._url_params.get(
"end_time") else datetime.today()
for start, end in generate_ranges(
start_time,
end_time,
increment=self._time_increment
):
for data_collection in data_collections:
new_data_collection = deepcopy(data_collection)
new_data_collection._url_params.update({"start_time": start, "end_time": end})
full_data_collections.append(new_data_collection)
return full_data_collections

def to_list(self) -> List[Dict[str, Any]]:
Expand Down
7 changes: 7 additions & 0 deletions coinmetrics/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,10 @@ def __init__(self, response: Response, *args: Any, **kwargs: Any):

def __str__(self) -> str:
return self.msg


class CoinMetricsClientNotFoundError(Exception):
"""Raised when a CoinMetricsClient instance is not found."""
def __init__(self, message="CoinMetricsClient not found"):
self.message = message
super().__init__(self.message)
Loading

0 comments on commit 82fa6e0

Please sign in to comment.