Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto cleanup memmap scores #1086

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions dedupe/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import itertools
import logging
import multiprocessing
import os
import pickle
import sqlite3
import tempfile
Expand Down Expand Up @@ -98,12 +97,27 @@ class IntegralMatching(Matching):

def score(self, pairs: RecordPairs) -> Scores:
"""
Scores pairs of records. Returns pairs of tuples of records id and
associated probabilities that the pair of records are match
Scores pairs of records. Returns a numpy structured array of scores.

Args:
pairs: Iterator of pairs of records

pairs: Iterator of pairs of records, such as from the output of :func:`pairs`

Returns:
A numpy
`structured array <https://docs.scipy.org/doc/numpy/user/basics.rec.html>`_
with a with a dtype of `[('pairs', id_type, 2), ('score', 'f4')]`
where dtype is either a str or int,
and score is a 32-bit float in the range (0, 1].
The 'pairs' column contains pairs of ids of
the records compared and the 'score' column contains
the similarity score for that pair of records.

This array will be a numpy.array when self.num_cores is 1,
and a numpy.memmap when self.num_cores is greater than 1.
This memmap will automatically clean itself up, you don't
have to worry about it.

For each pair, the smaller id will be first.
"""
try:
matches = core.scoreDuplicates(
Expand Down Expand Up @@ -175,7 +189,6 @@ def partition(
clusters = self.cluster(pair_scores, threshold)
clusters = self._add_singletons(data, clusters)
clusters = list(clusters)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have auto-cleanup, perhaps we don't want to cast this to a list? I think then this entire method would be out-of-core, right?

Same for casting to list in join()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's right. i think we we should put in a warning for a few releases though because some folks are likely depending on it being a list instead of a generator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to do what Gazeteer.search() does with the generator: bool parameter?

def score(.... generator: bool = False):
    ...
    if not generator:
        warn("this deprecated")
        return list(scores)
    else:
        yield from scores

_cleanup_scores(pair_scores)
return clusters

def _add_singletons(self, data: Data, clusters: Clusters) -> Clusters:
Expand Down Expand Up @@ -514,7 +527,6 @@ def join(
links = pair_scores[pair_scores["score"] > threshold]

links = list(links)
_cleanup_scores(pair_scores)
return links

def one_to_one(self, scores: Scores, threshold: float = 0.0) -> Links:
Expand Down Expand Up @@ -805,6 +817,8 @@ def score(self, blocks: Blocks) -> Generator[Scores, None, None]:
Args:
blocks: Iterator of blocks of records

Yields:
Structured numpy arrays. See :meth:`dedupe.Dedupe.score` for more info.
"""

matches = core.scoreGazette(
Expand Down Expand Up @@ -946,7 +960,7 @@ def __init__(
Args:
settings_file: A file object containing settings
info produced from the
:func:`~dedupe.api.ActiveMatching.write_settings` method.
:meth:`dedupe.Dedupe.write_settings` method.

num_cores: The number of cpus to use for parallel
processing, defaults to the number of cpus
Expand Down Expand Up @@ -1468,14 +1482,3 @@ def flatten_training(
y.extend([encoded_y] * len(pairs))

return examples, numpy.array(y)


def _cleanup_scores(arr: Scores) -> None:
try:
mmap_file = arr.filename # type: ignore
except AttributeError:
pass
else:
del arr
if mmap_file:
os.remove(mmap_file)
21 changes: 21 additions & 0 deletions dedupe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import queue
import tempfile
import weakref
from typing import TYPE_CHECKING, overload

import numpy
Expand Down Expand Up @@ -176,9 +177,29 @@ def scoreDuplicates(
else:
scored_pairs = numpy.array([], dtype=dtype)

# Monkeypatch in these extra methods and attributes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really cool!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah seriously! The first time I've used it too.

# See https://docs.python.org/3/library/weakref.html#comparing-finalizers-with-del-methods
scored_pairs.remove = weakref.finalize(scored_pairs, _cleanup_scores, scored_pairs) # type: ignore[union-attr]
scored_pairs.removed = property(_is_removed) # type: ignore[union-attr]

return scored_pairs


def _cleanup_scores(arr: Scores) -> None:
try:
mmap_file = arr.filename # type: ignore
except AttributeError:
pass
else:
del arr
if mmap_file:
os.remove(mmap_file)


def _is_removed(self):
return not self.remove.alive


def fillQueue(
queue: _Queue, iterable: Iterable[Any], stop_signals: int, chunk_size: int = 20000
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ requires = ["setuptools",
"cython"]

[tool.mypy]
python_version = "3.10"
plugins = "numpy.typing.mypy_plugin"
ignore_missing_imports = true
files = "dedupe"
check_untyped_defs = true
show_error_codes = true

[tool.pytest.ini_options]
minversion = "7.1"
Expand Down