Skip to content

Commit

Permalink
DATA-34 Unnest Catalog V2 DataFrames
Browse files Browse the repository at this point in the history
This MR lets catalog v2 dataframes be unnested and simplifies the CatalogV2 dataframe code.

**Background in addition to JIRA ticket details**

For Catalog V1, our approach to returning catalogs involved creating a separate data type for each catalog. This resulted in several instances of redundant code and made it expanding upon each new catalog data type laborious. For Catalog V2, we'll generalize all catalog returns as a single data type and account for the differences in structure across endpoint in how the CatalogV2DataCollection object is passed.

**Potential breaking changes**

Code that manually explode catalog v2 dataframes, specifically relying on referencing specific columns such as `frequencies` and `metrics` columns, will break. They are being replaced by their singular terms to represent the individual rows that are being unnested from those fields. Having said that, catalog v2 dataframes were not all that useful in their current state and were in an experimental stage, so I think it's fine to introduce this change.

**Tests**

To test the functionality of this code, each catalog v2 endpoint was called as a dataframe and list then matched against the expected fields.
  • Loading branch information
victoreram committed Oct 4, 2024
1 parent 1f71246 commit 6261653
Show file tree
Hide file tree
Showing 4 changed files with 541 additions and 31 deletions.
125 changes: 119 additions & 6 deletions coinmetrics/_data_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from coinmetrics._utils import get_file_path_or_buffer
from coinmetrics._models import AssetChainsData, CoinMetricsAPIModel, TransactionTrackerData
from coinmetrics._catalogs import convert_catalog_dtypes, _expand_df
from importlib import import_module
from concurrent.futures import ThreadPoolExecutor, Executor
from tqdm import tqdm
Expand Down Expand Up @@ -362,12 +363,6 @@ class TransactionTrackerDataCollection(DataCollection):
API_RETURN_MODEL = TransactionTrackerData


class CatalogV2DataCollection(DataCollection):
"""
This class will be used to implement functionality specific to catalog-v2 endpoints.
"""


class ParallelDataCollection(DataCollection):
"""
This class will be used as an extension of the normal data collection, but all functions will run in parallel,
Expand Down Expand Up @@ -1009,3 +1004,121 @@ def parse_date(date_input: Union[datetime, date, str]) -> datetime:
except ValueError:
continue
raise ValueError(f"Unrecognized date format for string: {date_input}")


class CatalogV2DataCollection(DataCollection):
"""
This class is used to implement functionality specific to catalog-v2 endpoints.
"""

def __init__(
self,
data_retrieval_function: DataRetrievalFuncType,
endpoint: str,
url_params: Dict[str, UrlParamTypes],
csv_export_supported: bool = True,
columns_to_store: List[str] = [],
client: Optional[CoinMetricsClient] = None,
metric_type: Optional[str] = None,
iterable_col: Optional[str] = None,
iterable_key: Optional[str] = None,
explode_on: Optional[str] = None,
assign_to: Optional[str] = None,
nested_catalog_columns: List[str] = ["min_time", "max_time"]
):
super().__init__(
data_retrieval_function=data_retrieval_function,
endpoint=endpoint,
url_params=url_params,
csv_export_supported=csv_export_supported,
columns_to_store=columns_to_store,
client=client
)
# *-metrics data
self.metric_type = metric_type
# column where nested catalog fields live e.g. "frequencies"
self.iterable_col = iterable_col
# grain for each value in iterable_col e.g. "frequency"
self.iterable_key = iterable_key
self.explode_on = explode_on
self.assign_to = assign_to
# fields in nested dicts in catalog response
self.nested_catalog_columns = nested_catalog_columns

def to_dataframe(
self,
header: Optional[List[str]] = None,
dtype_mapper: Optional[Dict[str, Any]] = None,
optimize_pandas_types: Optional[bool] = True
) -> DataFrameType:
"""
Transforms catalog data in list form into a dataframe
:return: DataFrame
"""
df = pd.DataFrame(self)

# catalog data with no nested data
if self.iterable_col is None or not isinstance(self.iterable_col, str) or not isinstance(self.iterable_key, str):
return convert_catalog_dtypes(df)

# for *-metrics and market-* types, add frequency (depth if orderbook)
if isinstance(self.iterable_key, str):
self.nested_catalog_columns = [self.iterable_key] + self.nested_catalog_columns

def _assign_column(df_: DataFrameType, col_name: str, values: Iterable[Any]) -> DataFrameType:
return df_.assign(**{col_name: values})

if self.metric_type is not None:
# for *-metrics datatypes
mapper = df[self.metric_type].to_dict()

def _assign_metric(x: DataFrameType) -> Any:
try:
return x[self.assign_to]
except TypeError:
return None

df = df.explode(self.explode_on).assign(
metrics=lambda x: pd.Series(x[self.explode_on])
)
df[self.assign_to] = df[self.explode_on].apply(_assign_metric)
df_metrics = df.dropna(subset=[self.explode_on]).metrics.apply(
pd.Series
)
df_metrics[self.metric_type] = df_metrics.index.map(mapper)
df_metrics = df_metrics.explode(self.iterable_col)

# expand min/max time, heights, hash
for column in self.nested_catalog_columns:
df_metrics = _assign_column(
df_metrics,
column,
_expand_df(
key=column, iterable=df_metrics[self.iterable_col]
)
)
df_metrics = df_metrics.drop([self.iterable_col], axis=1)

df = (
df.drop(["metrics"], axis=1).merge(
df_metrics, on=["metric", self.metric_type], how="left"
).reset_index(drop=True)
)
df = convert_catalog_dtypes(df)
return df
else:
# for market-* data types
df = df.explode(self.iterable_col)

# expand metadata (min/max time)
for column in self.nested_catalog_columns:
df = _assign_column(
df,
column,
_expand_df(
key=column, iterable=df[self.iterable_col]
)
)
df = df.drop([self.iterable_col], axis=1)

return convert_catalog_dtypes(df)
13 changes: 12 additions & 1 deletion coinmetrics/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from logging import getLogger
from os.path import expanduser
from time import sleep
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Set
from coinmetrics._typing import FilePathOrBuffer, UrlParamTypes

logger = getLogger("cm_client_utils")
Expand Down Expand Up @@ -115,3 +115,14 @@ def wrapper(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
return wrapper

return retry_wrapper


def get_keys_from_catalog(d: Dict[str, str]) -> Set[str]:
keys = []
for k, v in d.items():
if isinstance(v, list):
for nested_dict in v:
keys.extend(get_keys_from_catalog(nested_dict))
else:
keys.append(k)
return set(keys)
Loading

0 comments on commit 6261653

Please sign in to comment.