Skip to content

Commit

Permalink
sql alchemy creating constrains
Browse files Browse the repository at this point in the history
  • Loading branch information
dejandbt committed Feb 14, 2025
1 parent 75268ee commit 3b04924
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 169 deletions.
308 changes: 141 additions & 167 deletions src/matchbox/server/postgresql/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,27 @@
class ResolutionFrom(CountMixin, MBDB.MatchboxBase):
"""Resolution lineage closure table with cached truth values."""

def __init__(self, suffix=""):
self.__tablename__ = f"resolution_from{suffix}"

# Columns
self.parent = Column(
BIGINT,
ForeignKey(f"resolutions{suffix}.resolution_id", ondelete="CASCADE"),
primary_key=True,
)
self.child = Column(
BIGINT,
ForeignKey(f"resolutions{suffix}.resolution_id", ondelete="CASCADE"),
primary_key=True,
)
self.level = Column(INTEGER, nullable=False)
self.truth_cache = Column(FLOAT, nullable=True)

# Constraints
self.__table_args__ = (
CheckConstraint("parent != child", name="no_self_reference"),
CheckConstraint("level > 0", name="positive_level"),
)
__tablename__ = "resolution_from"

# Columns
parent = Column(
BIGINT,
ForeignKey("resolutions.resolution_id", ondelete="CASCADE"),
primary_key=True,
)
child = Column(
BIGINT,
ForeignKey("resolutions.resolution_id", ondelete="CASCADE"),
primary_key=True,
)
level = Column(INTEGER, nullable=False)
truth_cache = Column(FLOAT, nullable=True)

# Constraints
__table_args__ = (
CheckConstraint("parent != child", name="no_self_reference"),
CheckConstraint("level > 0", name="positive_level"),
)


class Resolutions(CountMixin, MBDB.MatchboxBase):
Expand All @@ -52,39 +51,38 @@ class Resolutions(CountMixin, MBDB.MatchboxBase):
Resolutions produce probabilities or own data in the clusters table.
"""

def __init__(self, suffix=""):
self.__tablename__ = f"resolutions{suffix}"

# Columns
self.resolution_id = Column(BIGINT, primary_key=True)
self.resolution_hash = Column(BYTEA, nullable=False)
self.type = Column(TEXT, nullable=False)
self.name = Column(TEXT, nullable=False)
self.description = Column(TEXT)
self.truth = Column(FLOAT)

# Relationships
self.source = relationship("Sources", back_populates="dataset_resolution", uselist=False)
self.probabilities = relationship(
"Probabilities", back_populates="proposed_by", cascade="all, delete-orphan"
)
self.children = relationship(
"Resolutions",
secondary=ResolutionFrom.__table__,
primaryjoin="Resolutions.resolution_id == ResolutionFrom.parent",
secondaryjoin="Resolutions.resolution_id == ResolutionFrom.child",
backref="parents",
)

# Constraints
self.__table_args__ = (
CheckConstraint(
"type IN ('model', 'dataset', 'human')",
name="resolution_type_constraints",
),
UniqueConstraint("resolution_hash", name="resolutions_hash_key"),
UniqueConstraint("name", name="resolutions_name_key"),
)
__tablename__ = "resolutions"

# Columns
resolution_id = Column(BIGINT, primary_key=True)
resolution_hash = Column(BYTEA, nullable=False)
type = Column(TEXT, nullable=False)
name = Column(TEXT, nullable=False)
description = Column(TEXT)
truth = Column(FLOAT)

# Relationships
source = relationship("Sources", back_populates="dataset_resolution", uselist=False)
probabilities = relationship(
"Probabilities", back_populates="proposed_by", cascade="all, delete-orphan"
)
children = relationship(
"Resolutions",
secondary=ResolutionFrom.__table__,
primaryjoin="Resolutions.resolution_id == ResolutionFrom.parent",
secondaryjoin="Resolutions.resolution_id == ResolutionFrom.child",
backref="parents",
)

# Constraints
__table_args__ = (
CheckConstraint(
"type IN ('model', 'dataset', 'human')",
name="resolution_type_constraints",
),
UniqueConstraint("resolution_hash", name="resolutions_hash_key"),
UniqueConstraint("name", name="resolutions_name_key"),
)

@property
def ancestors(self) -> set["Resolutions"]:
Expand Down Expand Up @@ -173,31 +171,30 @@ def next_id(cls) -> int:
class Sources(CountMixin, MBDB.MatchboxBase):
"""Table of sources of data for Matchbox."""

def __init__(self, suffix="", contains_temporary=False):
self.__tablename__ = "sources"

# Columns
self.resolution_id = Column(
BIGINT,
ForeignKey("resolutions.resolution_id", ondelete="CASCADE"),
primary_key=True,
)
self.alias = Column(TEXT, nullable=False)
self.full_name = Column(TEXT, nullable=False)
self.warehouse_hash = Column(BYTEA, nullable=False)
self.id = Column(TEXT, nullable=False)
self.column_names = Column(ARRAY(TEXT), nullable=False)
self.column_aliases = Column(ARRAY(TEXT), nullable=False)
self.column_types = Column(ARRAY(TEXT), nullable=False)

# Relationships
self.dataset_resolution = relationship("Resolutions", back_populates="source")
self.clusters = relationship("Clusters", back_populates="source")

# Constraints
self.__table_args__ = (
UniqueConstraint("full_name", "warehouse_hash", name="unique_source_address"),
)
__tablename__ = "sources"

# Columns
resolution_id = Column(
BIGINT,
ForeignKey("resolutions.resolution_id", ondelete="CASCADE"),
primary_key=True,
)
alias = Column(TEXT, nullable=False)
full_name = Column(TEXT, nullable=False)
warehouse_hash = Column(BYTEA, nullable=False)
id = Column(TEXT, nullable=False)
column_names = Column(ARRAY(TEXT), nullable=False)
column_aliases = Column(ARRAY(TEXT), nullable=False)
column_types = Column(ARRAY(TEXT), nullable=False)

# Relationships
dataset_resolution = relationship("Resolutions", back_populates="source")
clusters = relationship("Clusters", back_populates="source")

# Constraints
__table_args__ = (
UniqueConstraint("full_name", "warehouse_hash", name="unique_source_address"),
)

@classmethod
def list(cls) -> list["Sources"]:
Expand All @@ -208,69 +205,56 @@ def list(cls) -> list["Sources"]:
class Contains(CountMixin, MBDB.MatchboxBase):
"""Cluster lineage table."""

def __init__(self, suffix="", clusters_temporary=False):
self.__tablename__ = f"contains{suffix}"
__tablename__ = "contains"

# Columns
if clusters_temporary:
clusters_name = f"clusters{suffix}"
else:
clusters_name = "clusters"
# Columns
parent = Column(
BIGINT, ForeignKey("clusters.cluster_id", ondelete="CASCADE"), primary_key=True
)
child = Column(
BIGINT, ForeignKey("clusters.cluster_id", ondelete="CASCADE"), primary_key=True
)

self.parent = Column(
BIGINT, ForeignKey(f"{clusters_name}.cluster_id", ondelete="CASCADE"), primary_key=True
)
self.child = Column(
BIGINT, ForeignKey(f"{clusters_name}.cluster_id", ondelete="CASCADE"), primary_key=True
)

# Constraints and indices
self.__table_args__ = (
CheckConstraint("parent != child", name="no_self_containment"),
Index(f"ix_contains_parent_child{suffix}", "parent", "child"),
Index(f"ix_contains_child_parent", "child", "parent"),
)
# Constraints and indices
__table_args__ = (
CheckConstraint("parent != child", name="no_self_containment"),
Index("ix_contains_parent_child", "parent", "child"),
Index("ix_contains_child_parent", "child", "parent"),
)


class Clusters(CountMixin, MBDB.MatchboxBase):
"""Table of indexed data and clusters that match it."""

def __init__(self, suffix="", contains_temporary=False):
self.__tablename__ = f"clusters{suffix}"

# Columns
self.cluster_id = Column(BIGINT, primary_key=True)
self.cluster_hash = Column(BYTEA, nullable=False)

self.dataset = Column(BIGINT, ForeignKey("sources.resolution_id"), nullable=True)
# Uses array as source data may have identical rows. We can't control this
# Must be indexed or PostgreSQL incorrectly tries to use nested joins
# when retrieving small datasets in query() -- extremely slow
self.source_pk = Column(ARRAY(TEXT), index=True, nullable=True)

# Relationships
self.source = relationship("Sources", back_populates="clusters")
self.probabilities = relationship(
"Probabilities", back_populates="proposes", cascade="all, delete-orphan"
)

if contains_temporary:
contains_name = f"{Contains.__table__}{suffix}"
else:
contains_name = Contains.__table__
self.children = relationship(
"Clusters",
secondary=contains_name,
primaryjoin="Clusters.cluster_id == Contains.parent",
secondaryjoin="Clusters.cluster_id == Contains.child",
backref="parents",
)

# Constraints and indices
self.__table_args__ = (
Index(f"ix_clusters_id_gin{suffix}", self.source_pk, postgresql_using="gin"),
UniqueConstraint("cluster_hash", name=f"clusters_hash_key{suffix}"),
)
__tablename__ = "clusters"

# Columns
cluster_id = Column(BIGINT, primary_key=True)
cluster_hash = Column(BYTEA, nullable=False)
dataset = Column(BIGINT, ForeignKey("sources.resolution_id"), nullable=True)
# Uses array as source data may have identical rows. We can't control this
# Must be indexed or PostgreSQL incorrectly tries to use nested joins
# when retrieving small datasets in query() -- extremely slow
source_pk = Column(ARRAY(TEXT), index=True, nullable=True)

# Relationships
source = relationship("Sources", back_populates="clusters")
probabilities = relationship(
"Probabilities", back_populates="proposes", cascade="all, delete-orphan"
)
children = relationship(
"Clusters",
secondary=Contains.__table__,
primaryjoin="Clusters.cluster_id == Contains.parent",
secondaryjoin="Clusters.cluster_id == Contains.child",
backref="parents",
)

# Constraints and indices
__table_args__ = (
Index("ix_clusters_id_gin", source_pk, postgresql_using="gin"),
UniqueConstraint("cluster_hash", name="clusters_hash_key"),
)

@classmethod
def next_id(cls) -> int:
Expand All @@ -285,34 +269,24 @@ def next_id(cls) -> int:
class Probabilities(CountMixin, MBDB.MatchboxBase):
"""Table of probabilities that a cluster is correct, according to a resolution."""

def __init__(self, suffix="", resolutions_temporary=False, clusters_temporary=False):
self.__tablename__ = f"probabilities{suffix}"

# Columns
if resolutions_temporary:
resolutions_name = f"resolutions{suffix}"
else:
resolutions_name = "resolutions"
if clusters_temporary:
clusters_name = f"clusters{suffix}"
else:
clusters_name = "clusters"

self.resolution = Column(
BIGINT,
ForeignKey(f"{resolutions_name}.resolution_id", ondelete="CASCADE"),
primary_key=True,
)
self.cluster = Column(
BIGINT, ForeignKey(f"{clusters_name}.cluster_id", ondelete="CASCADE"), primary_key=True
)
self.probability = Column(SMALLINT, nullable=False)

# Relationships
self.proposed_by = relationship("Resolutions", back_populates="probabilities")
self.proposes = relationship("Clusters", back_populates="probabilities")

# Constraints
self.__table_args__ = (
CheckConstraint("probability BETWEEN 0 AND 100", name="valid_probability"),
)
__tablename__ = "probabilities"

# Columns
resolution = Column(
BIGINT,
ForeignKey("resolutions.resolution_id", ondelete="CASCADE"),
primary_key=True,
)
cluster = Column(
BIGINT, ForeignKey("clusters.cluster_id", ondelete="CASCADE"), primary_key=True
)
probability = Column(SMALLINT, nullable=False)

# Relationships
proposed_by = relationship("Resolutions", back_populates="probabilities")
proposes = relationship("Clusters", back_populates="probabilities")

# Constraints
__table_args__ = (
CheckConstraint("probability BETWEEN 0 AND 100", name="valid_probability"),
)
16 changes: 14 additions & 2 deletions src/matchbox/server/postgresql/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from sqlalchemy.orm import DeclarativeMeta, Session
from sqlalchemy.exc import DatabaseError as AlchemyDatabaseError

from matchbox.server.postgresql.db import MBDB

from matchbox.common.graph import (
ResolutionEdge,
ResolutionGraph,
Expand Down Expand Up @@ -200,15 +202,25 @@ def _create_adbc_table_constraints(db_schema:str, sufix:str, conn:Connection) ->
"""
# Cluster

_run_queries([
statements = [
f"""DROP TABLE IF EXISTS {db_schema}.clusters""",
f"""DROP TABLE IF EXISTS {db_schema}.contains""",
f"""DROP TABLE IF EXISTS {db_schema}.probabilities""",

f"""ALTER TABLE {db_schema}.clusters_{sufix} RENAME TO clusters""",
f"""ALTER TABLE {db_schema}.contains_{sufix} RENAME TO contains""",
f"""ALTER TABLE {db_schema}.probabilities_{sufix} RENAME TO probabilities"""
], conn)
]
#start the transaction
conn.begin()
for query in statements:
conn.execute(text(query))

MBDB.MatchboxBase.metadata.create_all(conn)

conn.commit()


return True

def _adbc_insert_data(clusters:pa.Table, contains:pa.Table, probabilities:pa.Table, suffix:str, alchemy_conn:Connection, resolution_id:int) -> bool:
Expand Down

0 comments on commit 3b04924

Please sign in to comment.