Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update sampling logic for density #1831

Merged
merged 18 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 40 additions & 22 deletions splink/cluster_studio.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,41 +227,51 @@ def _get_cluster_id_of_each_size(
return df_cluster_sample_with_size.as_record_dict()


def _get_lowest_density_cluster_ids(
def _get_lowest_density_clusters(
linker: "Linker",
df_cluster_metrics: SplinkDataFrame,
sample_size: int,
rows_per_partition: int,
min_nodes: int,
):
"""Retrieves cluster IDs based on density metric, ordered from least to
most dense.
"""Returns ids of lowest density clusters of different sizes by
performing stratified sampling.

Args:
linker: An instance of the Splink Linker class.
df_cluster_metrics (SplinkDataFrame): dataframe containing
cluster metrics, including density.
sample_size (int): size of sample returned.
min_nodes (int): The minimum number of nodes a cluster must contain
cluster metrics including density.
rows_per_partition (int): number of rows in each strata (partition)
min_nodes (int): minimum number of nodes a cluster must contain
to be included in the sample.

Returns:
list: A list of cluster IDs based on density metric, ordered from least to
most dense.
list: A list of cluster ids of lowest density clusters of different sizes.
"""
# Ordering: least dense clusters first

sql = f"""
SELECT cluster_id
FROM {df_cluster_metrics.physical_name}
WHERE n_nodes >= {min_nodes}
ORDER BY density
LIMIT {sample_size}
select
cluster_id,
n_nodes,
density,
row_number() over (partition by n_nodes order by density, cluster_id) as row_num
from {df_cluster_metrics.physical_name}
where n_nodes >= {min_nodes}
"""

df_density_sample = linker._sql_to_splink_dataframe_checking_cache(
sql, "__splink__density_sample"
)
linker._enqueue_sql(sql, "__splink__partition_clusters_by_size")

sql = f"""
select
cluster_id,
round(density, 4) as density_4dp
from __splink__partition_clusters_by_size
where row_num <= {rows_per_partition}
"""

linker._enqueue_sql(sql, "__splink__lowest_density_clusters")
df_lowest_density_clusters = linker._execute_sql_pipeline()

return [r["cluster_id"] for r in df_density_sample.as_record_dict()]
return df_lowest_density_clusters.as_record_dict()


def render_splink_cluster_studio_html(
Expand Down Expand Up @@ -299,7 +309,7 @@ def render_splink_cluster_studio_html(
]
cluster_ids = [c["cluster_id"] for c in cluster_ids]
named_clusters_dict = dict(zip(cluster_ids, cluster_names))
if sampling_method == "lowest_density_clusters":
if sampling_method == "lowest_density_clusters_by_size":
if _df_cluster_metrics is None:
raise SplinkException(
"""To sample by density, you must provide a cluster metrics table
Expand All @@ -309,9 +319,17 @@ def render_splink_cluster_studio_html(
else:
# Using sensible default for min_nodes. Might become option
# for users in future
cluster_ids = _get_lowest_density_cluster_ids(
linker, _df_cluster_metrics, sample_size, min_nodes=3
cluster_ids = _get_lowest_density_clusters(
linker, _df_cluster_metrics, rows_per_partition=1, min_nodes=3
)
if len(cluster_ids) > sample_size:
cluster_ids = random.sample(cluster_ids, k=sample_size)
cluster_names = [
f"Cluster ID: {c['cluster_id']}, density (4dp) {c['density_4dp']}"
for c in cluster_ids
]
cluster_ids = [c["cluster_id"] for c in cluster_ids]
named_clusters_dict = dict(zip(cluster_ids, cluster_names))

cluster_recs = df_clusters_as_records(linker, df_clustered_nodes, cluster_ids)
df_nodes = create_df_nodes(linker, df_clustered_nodes, cluster_ids)
Expand Down
27 changes: 17 additions & 10 deletions tests/test_cluster_studio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pandas as pd

from splink.cluster_studio import _get_lowest_density_cluster_ids
from splink.cluster_studio import _get_lowest_density_clusters
from splink.duckdb.linker import DuckDBLinker


Expand All @@ -16,14 +16,14 @@ def test_density_sample():
linker = DuckDBLinker(df, settings)

# Dummy cluster metrics table
cluster = ["A", "B", "C", "D", "E"]
n_nodes = [3, 2, 10, 3, 19]
n_edges = [2, 1, 5, 2, 25]
cluster = ["A", "B", "C", "D", "E", "F"]
n_nodes = [2, 3, 3, 3, 10, 10]
n_edges = [1, 2, 2, 3, 9, 20]
density = [
(n_edges * 2) / (n_nodes * (n_nodes - 1))
for n_nodes, n_edges in zip(n_nodes, n_edges)
]
df_metrics = pd.DataFrame(
pd_metrics = pd.DataFrame(
{
"cluster_id": cluster,
"n_nodes": n_nodes,
Expand All @@ -34,10 +34,17 @@ def test_density_sample():

# Convert to Splink dataframe
df_cluster_metrics = linker.register_table(
df_metrics, "df_cluster_metrics", overwrite=True
pd_metrics, "df_cluster_metrics", overwrite=True
)
df_result = _get_lowest_density_cluster_ids(
linker, df_cluster_metrics, sample_size=3, min_nodes=3
result = _get_lowest_density_clusters(
linker, df_cluster_metrics, rows_per_partition=1, min_nodes=3
)
df_expect = ["C", "E", "A"]
assert df_result == df_expect

result = sorted(result, key=lambda x: x["cluster_id"])

expect = [
{"cluster_id": "B", "density_4dp": 0.6667},
{"cluster_id": "E", "density_4dp": 0.2},
]

assert result == expect
Loading