From 3b049244e008a80c9e072efac92f49aefb98ca30 Mon Sep 17 00:00:00 2001 From: Dejan Lozanovic Date: Fri, 14 Feb 2025 10:00:59 +0000 Subject: [PATCH] sql alchemy creating constrains --- src/matchbox/server/postgresql/orm.py | 308 ++++++++++----------- src/matchbox/server/postgresql/utils/db.py | 16 +- 2 files changed, 155 insertions(+), 169 deletions(-) diff --git a/src/matchbox/server/postgresql/orm.py b/src/matchbox/server/postgresql/orm.py index c38897c2..93a1f336 100644 --- a/src/matchbox/server/postgresql/orm.py +++ b/src/matchbox/server/postgresql/orm.py @@ -22,28 +22,27 @@ class ResolutionFrom(CountMixin, MBDB.MatchboxBase): """Resolution lineage closure table with cached truth values.""" - def __init__(self, suffix=""): - self.__tablename__ = f"resolution_from{suffix}" - - # Columns - self.parent = Column( - BIGINT, - ForeignKey(f"resolutions{suffix}.resolution_id", ondelete="CASCADE"), - primary_key=True, - ) - self.child = Column( - BIGINT, - ForeignKey(f"resolutions{suffix}.resolution_id", ondelete="CASCADE"), - primary_key=True, - ) - self.level = Column(INTEGER, nullable=False) - self.truth_cache = Column(FLOAT, nullable=True) - - # Constraints - self.__table_args__ = ( - CheckConstraint("parent != child", name="no_self_reference"), - CheckConstraint("level > 0", name="positive_level"), - ) + __tablename__ = "resolution_from" + + # Columns + parent = Column( + BIGINT, + ForeignKey("resolutions.resolution_id", ondelete="CASCADE"), + primary_key=True, + ) + child = Column( + BIGINT, + ForeignKey("resolutions.resolution_id", ondelete="CASCADE"), + primary_key=True, + ) + level = Column(INTEGER, nullable=False) + truth_cache = Column(FLOAT, nullable=True) + + # Constraints + __table_args__ = ( + CheckConstraint("parent != child", name="no_self_reference"), + CheckConstraint("level > 0", name="positive_level"), + ) class Resolutions(CountMixin, MBDB.MatchboxBase): @@ -52,39 +51,38 @@ class Resolutions(CountMixin, MBDB.MatchboxBase): Resolutions produce probabilities or own data in the clusters table. """ - def __init__(self, suffix=""): - self.__tablename__ = f"resolutions{suffix}" - - # Columns - self.resolution_id = Column(BIGINT, primary_key=True) - self.resolution_hash = Column(BYTEA, nullable=False) - self.type = Column(TEXT, nullable=False) - self.name = Column(TEXT, nullable=False) - self.description = Column(TEXT) - self.truth = Column(FLOAT) - - # Relationships - self.source = relationship("Sources", back_populates="dataset_resolution", uselist=False) - self.probabilities = relationship( - "Probabilities", back_populates="proposed_by", cascade="all, delete-orphan" - ) - self.children = relationship( - "Resolutions", - secondary=ResolutionFrom.__table__, - primaryjoin="Resolutions.resolution_id == ResolutionFrom.parent", - secondaryjoin="Resolutions.resolution_id == ResolutionFrom.child", - backref="parents", - ) - - # Constraints - self.__table_args__ = ( - CheckConstraint( - "type IN ('model', 'dataset', 'human')", - name="resolution_type_constraints", - ), - UniqueConstraint("resolution_hash", name="resolutions_hash_key"), - UniqueConstraint("name", name="resolutions_name_key"), - ) + __tablename__ = "resolutions" + + # Columns + resolution_id = Column(BIGINT, primary_key=True) + resolution_hash = Column(BYTEA, nullable=False) + type = Column(TEXT, nullable=False) + name = Column(TEXT, nullable=False) + description = Column(TEXT) + truth = Column(FLOAT) + + # Relationships + source = relationship("Sources", back_populates="dataset_resolution", uselist=False) + probabilities = relationship( + "Probabilities", back_populates="proposed_by", cascade="all, delete-orphan" + ) + children = relationship( + "Resolutions", + secondary=ResolutionFrom.__table__, + primaryjoin="Resolutions.resolution_id == ResolutionFrom.parent", + secondaryjoin="Resolutions.resolution_id == ResolutionFrom.child", + backref="parents", + ) + + # Constraints + __table_args__ = ( + CheckConstraint( + "type IN ('model', 'dataset', 'human')", + name="resolution_type_constraints", + ), + UniqueConstraint("resolution_hash", name="resolutions_hash_key"), + UniqueConstraint("name", name="resolutions_name_key"), + ) @property def ancestors(self) -> set["Resolutions"]: @@ -173,31 +171,30 @@ def next_id(cls) -> int: class Sources(CountMixin, MBDB.MatchboxBase): """Table of sources of data for Matchbox.""" - def __init__(self, suffix="", contains_temporary=False): - self.__tablename__ = "sources" - - # Columns - self.resolution_id = Column( - BIGINT, - ForeignKey("resolutions.resolution_id", ondelete="CASCADE"), - primary_key=True, - ) - self.alias = Column(TEXT, nullable=False) - self.full_name = Column(TEXT, nullable=False) - self.warehouse_hash = Column(BYTEA, nullable=False) - self.id = Column(TEXT, nullable=False) - self.column_names = Column(ARRAY(TEXT), nullable=False) - self.column_aliases = Column(ARRAY(TEXT), nullable=False) - self.column_types = Column(ARRAY(TEXT), nullable=False) - - # Relationships - self.dataset_resolution = relationship("Resolutions", back_populates="source") - self.clusters = relationship("Clusters", back_populates="source") - - # Constraints - self.__table_args__ = ( - UniqueConstraint("full_name", "warehouse_hash", name="unique_source_address"), - ) + __tablename__ = "sources" + + # Columns + resolution_id = Column( + BIGINT, + ForeignKey("resolutions.resolution_id", ondelete="CASCADE"), + primary_key=True, + ) + alias = Column(TEXT, nullable=False) + full_name = Column(TEXT, nullable=False) + warehouse_hash = Column(BYTEA, nullable=False) + id = Column(TEXT, nullable=False) + column_names = Column(ARRAY(TEXT), nullable=False) + column_aliases = Column(ARRAY(TEXT), nullable=False) + column_types = Column(ARRAY(TEXT), nullable=False) + + # Relationships + dataset_resolution = relationship("Resolutions", back_populates="source") + clusters = relationship("Clusters", back_populates="source") + + # Constraints + __table_args__ = ( + UniqueConstraint("full_name", "warehouse_hash", name="unique_source_address"), + ) @classmethod def list(cls) -> list["Sources"]: @@ -208,69 +205,56 @@ def list(cls) -> list["Sources"]: class Contains(CountMixin, MBDB.MatchboxBase): """Cluster lineage table.""" - def __init__(self, suffix="", clusters_temporary=False): - self.__tablename__ = f"contains{suffix}" + __tablename__ = "contains" - # Columns - if clusters_temporary: - clusters_name = f"clusters{suffix}" - else: - clusters_name = "clusters" + # Columns + parent = Column( + BIGINT, ForeignKey("clusters.cluster_id", ondelete="CASCADE"), primary_key=True + ) + child = Column( + BIGINT, ForeignKey("clusters.cluster_id", ondelete="CASCADE"), primary_key=True + ) - self.parent = Column( - BIGINT, ForeignKey(f"{clusters_name}.cluster_id", ondelete="CASCADE"), primary_key=True - ) - self.child = Column( - BIGINT, ForeignKey(f"{clusters_name}.cluster_id", ondelete="CASCADE"), primary_key=True - ) - - # Constraints and indices - self.__table_args__ = ( - CheckConstraint("parent != child", name="no_self_containment"), - Index(f"ix_contains_parent_child{suffix}", "parent", "child"), - Index(f"ix_contains_child_parent", "child", "parent"), - ) + # Constraints and indices + __table_args__ = ( + CheckConstraint("parent != child", name="no_self_containment"), + Index("ix_contains_parent_child", "parent", "child"), + Index("ix_contains_child_parent", "child", "parent"), + ) class Clusters(CountMixin, MBDB.MatchboxBase): """Table of indexed data and clusters that match it.""" - def __init__(self, suffix="", contains_temporary=False): - self.__tablename__ = f"clusters{suffix}" - - # Columns - self.cluster_id = Column(BIGINT, primary_key=True) - self.cluster_hash = Column(BYTEA, nullable=False) - - self.dataset = Column(BIGINT, ForeignKey("sources.resolution_id"), nullable=True) - # Uses array as source data may have identical rows. We can't control this - # Must be indexed or PostgreSQL incorrectly tries to use nested joins - # when retrieving small datasets in query() -- extremely slow - self.source_pk = Column(ARRAY(TEXT), index=True, nullable=True) - - # Relationships - self.source = relationship("Sources", back_populates="clusters") - self.probabilities = relationship( - "Probabilities", back_populates="proposes", cascade="all, delete-orphan" - ) - - if contains_temporary: - contains_name = f"{Contains.__table__}{suffix}" - else: - contains_name = Contains.__table__ - self.children = relationship( - "Clusters", - secondary=contains_name, - primaryjoin="Clusters.cluster_id == Contains.parent", - secondaryjoin="Clusters.cluster_id == Contains.child", - backref="parents", - ) - - # Constraints and indices - self.__table_args__ = ( - Index(f"ix_clusters_id_gin{suffix}", self.source_pk, postgresql_using="gin"), - UniqueConstraint("cluster_hash", name=f"clusters_hash_key{suffix}"), - ) + __tablename__ = "clusters" + + # Columns + cluster_id = Column(BIGINT, primary_key=True) + cluster_hash = Column(BYTEA, nullable=False) + dataset = Column(BIGINT, ForeignKey("sources.resolution_id"), nullable=True) + # Uses array as source data may have identical rows. We can't control this + # Must be indexed or PostgreSQL incorrectly tries to use nested joins + # when retrieving small datasets in query() -- extremely slow + source_pk = Column(ARRAY(TEXT), index=True, nullable=True) + + # Relationships + source = relationship("Sources", back_populates="clusters") + probabilities = relationship( + "Probabilities", back_populates="proposes", cascade="all, delete-orphan" + ) + children = relationship( + "Clusters", + secondary=Contains.__table__, + primaryjoin="Clusters.cluster_id == Contains.parent", + secondaryjoin="Clusters.cluster_id == Contains.child", + backref="parents", + ) + + # Constraints and indices + __table_args__ = ( + Index("ix_clusters_id_gin", source_pk, postgresql_using="gin"), + UniqueConstraint("cluster_hash", name="clusters_hash_key"), + ) @classmethod def next_id(cls) -> int: @@ -285,34 +269,24 @@ def next_id(cls) -> int: class Probabilities(CountMixin, MBDB.MatchboxBase): """Table of probabilities that a cluster is correct, according to a resolution.""" - def __init__(self, suffix="", resolutions_temporary=False, clusters_temporary=False): - self.__tablename__ = f"probabilities{suffix}" - - # Columns - if resolutions_temporary: - resolutions_name = f"resolutions{suffix}" - else: - resolutions_name = "resolutions" - if clusters_temporary: - clusters_name = f"clusters{suffix}" - else: - clusters_name = "clusters" - - self.resolution = Column( - BIGINT, - ForeignKey(f"{resolutions_name}.resolution_id", ondelete="CASCADE"), - primary_key=True, - ) - self.cluster = Column( - BIGINT, ForeignKey(f"{clusters_name}.cluster_id", ondelete="CASCADE"), primary_key=True - ) - self.probability = Column(SMALLINT, nullable=False) - - # Relationships - self.proposed_by = relationship("Resolutions", back_populates="probabilities") - self.proposes = relationship("Clusters", back_populates="probabilities") - - # Constraints - self.__table_args__ = ( - CheckConstraint("probability BETWEEN 0 AND 100", name="valid_probability"), - ) + __tablename__ = "probabilities" + + # Columns + resolution = Column( + BIGINT, + ForeignKey("resolutions.resolution_id", ondelete="CASCADE"), + primary_key=True, + ) + cluster = Column( + BIGINT, ForeignKey("clusters.cluster_id", ondelete="CASCADE"), primary_key=True + ) + probability = Column(SMALLINT, nullable=False) + + # Relationships + proposed_by = relationship("Resolutions", back_populates="probabilities") + proposes = relationship("Clusters", back_populates="probabilities") + + # Constraints + __table_args__ = ( + CheckConstraint("probability BETWEEN 0 AND 100", name="valid_probability"), + ) diff --git a/src/matchbox/server/postgresql/utils/db.py b/src/matchbox/server/postgresql/utils/db.py index 71c4144a..1db04c12 100644 --- a/src/matchbox/server/postgresql/utils/db.py +++ b/src/matchbox/server/postgresql/utils/db.py @@ -18,6 +18,8 @@ from sqlalchemy.orm import DeclarativeMeta, Session from sqlalchemy.exc import DatabaseError as AlchemyDatabaseError +from matchbox.server.postgresql.db import MBDB + from matchbox.common.graph import ( ResolutionEdge, ResolutionGraph, @@ -200,7 +202,7 @@ def _create_adbc_table_constraints(db_schema:str, sufix:str, conn:Connection) -> """ # Cluster - _run_queries([ + statements = [ f"""DROP TABLE IF EXISTS {db_schema}.clusters""", f"""DROP TABLE IF EXISTS {db_schema}.contains""", f"""DROP TABLE IF EXISTS {db_schema}.probabilities""", @@ -208,7 +210,17 @@ def _create_adbc_table_constraints(db_schema:str, sufix:str, conn:Connection) -> f"""ALTER TABLE {db_schema}.clusters_{sufix} RENAME TO clusters""", f"""ALTER TABLE {db_schema}.contains_{sufix} RENAME TO contains""", f"""ALTER TABLE {db_schema}.probabilities_{sufix} RENAME TO probabilities""" - ], conn) + ] + #start the transaction + conn.begin() + for query in statements: + conn.execute(text(query)) + + MBDB.MatchboxBase.metadata.create_all(conn) + + conn.commit() + + return True def _adbc_insert_data(clusters:pa.Table, contains:pa.Table, probabilities:pa.Table, suffix:str, alchemy_conn:Connection, resolution_id:int) -> bool: