Skip to content

Commit

Permalink
Merge pull request #1557 from strictdoc-project/stanislaw/trace_refac…
Browse files Browse the repository at this point in the history
…toring

traceability_index: database: simplify handling of sections, anchors and links
  • Loading branch information
stanislaw authored Dec 30, 2023
2 parents 402f856 + 7c98c17 commit 2eeeb86
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 230 deletions.
34 changes: 25 additions & 9 deletions strictdoc/core/graph/validations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

from strictdoc.backend.sdoc.models.anchor import Anchor
from strictdoc.core.graph_database import (
ConstraintViolation,
Expand All @@ -8,22 +10,36 @@


class RemoveNodeValidation:
def validate(self, database: GraphDatabase, uuid: MID):
node = database.get_node_by_mid(uuid)
if isinstance(node, Anchor):
self.validate_anchor(database, node)
"""
FIXME: This class is not tested. Also, most of the code should rather be
in the "can perform operation?"-kind of class.
"""

def validate(
self, database: GraphDatabase, link_type: GraphLinkType, lhs_node: Any
):
if link_type == GraphLinkType.MID_TO_NODE:
assert isinstance(lhs_node, MID)
node = database.get_link_value(
link_type=GraphLinkType.MID_TO_NODE,
lhs_node=lhs_node,
weak=False,
)
if isinstance(node, Anchor):
self.validate_anchor(database, node)

@staticmethod
def validate_anchor(database: GraphDatabase, anchor: Anchor):
assert isinstance(anchor, Anchor)

existing_link = database.get_link_value_weak(
link_type=GraphLinkType.SECTIONS_TO_INCOMING_LINKS,
lhs_node=anchor.value,
existing_links = database.get_link_values(
link_type=GraphLinkType.NODE_TO_INCOMING_LINKS,
lhs_node=anchor.reserved_mid,
weak=True,
)

if existing_link is not None:
if existing_links is not None:
raise ConstraintViolation(
f"Cannot delete anchor {anchor} because it has incoming links: "
f"{anchor.value} -> {existing_link}"
f"{anchor.value} -> {existing_links}"
)
229 changes: 89 additions & 140 deletions strictdoc/core/graph_database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import defaultdict
from enum import IntEnum
from typing import Any, Dict, List, Optional
from enum import Enum
from typing import Any, Dict, Optional

from strictdoc.helpers.mid import MID
from strictdoc.helpers.ordered_set import OrderedSet
Expand All @@ -11,14 +11,20 @@ def __init__(self, message):
super().__init__(message)


class LinkType(IntEnum):
pass
class LinkType(Enum):
def __init__(self, rank, relation_type, lhs_type, rhs_type):
assert relation_type in ("ONE_TO_ONE", "ONE_TO_MANY")
self.rank = rank
self.relation_type: str = relation_type
self.lhs_type = lhs_type
self.rhs_type = rhs_type

def is_one_to_one(self):
return self.relation_type == "ONE_TO_ONE"


class GraphDatabase:
def __init__(self):
self._map_mid_to_node: Dict[MID, Any] = {}
self._map_uid_to_node: Dict[str, Any] = {}
self._links: Dict[LinkType, Dict[MID, OrderedSet[MID]]] = defaultdict(
dict
)
Expand Down Expand Up @@ -51,167 +57,110 @@ def dump(self) -> str:
result += f"- RHS MID: {rhs_mid}\n"
return result

def add_node_by_mid(self, mid: MID, uid: Optional[str], node: Any):
assert isinstance(mid, MID), mid
assert mid not in self._map_mid_to_node, (self._map_mid_to_node, node)
self._map_mid_to_node[mid] = node
if uid is not None:
self._map_uid_to_node[uid] = node

def get_node_by_mid(self, mid: MID) -> Any:
assert isinstance(mid, MID), mid
if mid not in self._map_mid_to_node:
raise LookupError
return self._map_mid_to_node[mid]

def get_node_by_mid_weak(self, mid: MID) -> Optional[Any]:
assert isinstance(mid, MID), mid
if mid not in self._map_mid_to_node:
return None
return self._map_mid_to_node[mid]

def get_node_by_uid(self, uid: str) -> Any:
assert isinstance(uid, str), uid
if uid not in self._map_uid_to_node:
raise LookupError
return self._map_uid_to_node[uid]

def get_node_by_uid_weak(self, uid: str) -> Optional[Any]:
assert isinstance(uid, str), uid
if uid not in self._map_uid_to_node:
return None
return self._map_uid_to_node[uid]

def remove_node_by_mid(self, mid: MID):
assert isinstance(mid, MID), mid
if mid not in self._map_mid_to_node:
raise LookupError
if self.remove_node_validation is not None:
self.remove_node_validation.validate(self, mid)
node_to_remove = self._map_mid_to_node
del self._map_mid_to_node[mid]

uids_to_remove = []
for uid_, node_ in self._map_uid_to_node.items():
if node_ == node_to_remove:
uids_to_remove.append(uid_)
assert len(uids_to_remove) <= 1
for uid_to_remove_ in uids_to_remove:
del self._map_uid_to_node[uid_to_remove_]

def get_link_values(self, *, link_type: LinkType, lhs_node: Any) -> List:
def has_link(self, *, link_type: LinkType, lhs_node: Any) -> bool:
if link_type not in self._links:
raise LookupError
links: Dict[MID, OrderedSet[MID]] = self._links[link_type]
if lhs_node.reserved_mid not in links:
raise LookupError
rhs_mids = self._links[link_type][lhs_node.reserved_mid]
if len(rhs_mids) == 0:
raise LookupError
rhs_nodes = []
for rhs_mid in rhs_mids:
rhs_nodes.append(self.get_node_by_mid(rhs_mid))
return rhs_nodes
return False
links = self._links[link_type]
if lhs_node not in links:
return False
return True

def get_link_value_weak(
self, *, link_type: LinkType, lhs_node: Any
def get_link_value(
self, *, link_type: LinkType, lhs_node: Any, weak: bool
) -> Optional[Any]:
link_values = self.get_link_values_weak(
link_type=link_type, lhs_node=lhs_node
link_values = self.get_link_values(
link_type=link_type, lhs_node=lhs_node, weak=True
)
if link_values is not None and len(link_values) > 0:
assert len(link_values) == 1
return next(iter(link_values))
return None
if weak:
return None
raise LookupError

def get_link_values_weak(
self, *, link_type: LinkType, lhs_node: Any
) -> Optional[List]:
def get_link_values(
self, *, link_type: LinkType, lhs_node: Any, weak: bool
) -> Optional[OrderedSet]:
if link_type not in self._links:
return None
if weak:
return None
raise LookupError
links = self._links[link_type]
if lhs_node.reserved_mid not in links:
if lhs_node not in links:
return None
rhs_nodes = []
for rhs_mid in links[lhs_node.reserved_mid]:
rhs_nodes.append(self.get_node_by_mid(rhs_mid))
return rhs_nodes

def get_link_values_reverse_weak(
self, *, link_type: LinkType, rhs_node: Any
) -> Optional[List]:
return links[lhs_node]

def get_link_values_reverse(
self, *, link_type: LinkType, rhs_node: Any, weak: bool
) -> Optional[OrderedSet]:
if link_type not in self._links_reverse:
return None
if weak:
return None
raise LookupError
reverse_links = self._links_reverse[link_type]
if rhs_node.reserved_mid not in reverse_links:
return None
lhs_nodes = []
for lhs_mid_ in reverse_links[rhs_node.reserved_mid]:
lhs_nodes.append(self.get_node_by_mid(lhs_mid_))
return lhs_nodes

def link_exists(self, *, link_type: LinkType, lhs_node: Any) -> bool:
if link_type not in self._links:
return False
links = self._links[link_type]
if lhs_node.reserved_mid not in links:
return False
return True
if rhs_node not in reverse_links:
if weak:
return None
raise LookupError
return reverse_links[rhs_node]

def node_with_uid_exists(self, *, uid: str) -> bool:
return uid in self._map_uid_to_node
def create_link(self, *, link_type: LinkType, lhs_node: Any, rhs_node: Any):
if not isinstance(lhs_node, link_type.lhs_type):
raise TypeError(
f"Type mismatch: {type(lhs_node)} {link_type.lhs_type}"
)

def add_link(self, *, link_type: LinkType, lhs_node: Any, rhs_node: Any):
assert lhs_node != rhs_node, (lhs_node, rhs_node)
assert lhs_node.reserved_mid != rhs_node.reserved_mid, (
lhs_node,
rhs_node,
)
assert not isinstance(lhs_node, MID)
assert not isinstance(rhs_node, MID)
assert isinstance(lhs_node.reserved_mid, MID)
assert isinstance(rhs_node.reserved_mid, MID)

self._map_mid_to_node[lhs_node.reserved_mid] = lhs_node
self._map_mid_to_node[rhs_node.reserved_mid] = rhs_node

links = self._links[link_type]
if lhs_node.reserved_mid in links:
assert rhs_node.reserved_mid not in links[lhs_node.reserved_mid]
reverse_links = self._links_reverse[link_type]

if lhs_node.reserved_mid not in links:
links[lhs_node.reserved_mid] = OrderedSet([rhs_node.reserved_mid])
if lhs_node not in links:
links[lhs_node] = OrderedSet([rhs_node])
else:
links[lhs_node.reserved_mid].add(rhs_node.reserved_mid)

if rhs_node.reserved_mid not in reverse_links:
reverse_links[rhs_node.reserved_mid] = OrderedSet(
[lhs_node.reserved_mid]
)
if link_type.is_one_to_one():
raise TypeError(
"The LHS node already exists for this one-to-one relation. "
"Cannot insert another one. "
f"Link type: {link_type}, "
f"LHS node: {lhs_node}, "
f"RHS node: {rhs_node}."
)
links[lhs_node].add(rhs_node)

if rhs_node not in reverse_links:
reverse_links[rhs_node] = OrderedSet([lhs_node])
else:
reverse_links[rhs_node.reserved_mid].add(lhs_node.reserved_mid)
reverse_links[rhs_node].add(lhs_node)

def remove_link(
def delete_link(
self,
*,
link_type: LinkType,
lhs_node: Any,
rhs_node: Any,
remove_lhs_node: bool,
remove_rhs_node: bool,
):
assert link_type in self._links
assert isinstance(lhs_node.reserved_mid, MID), lhs_node
assert isinstance(rhs_node.reserved_mid, MID), rhs_node
assert lhs_node.reserved_mid in self._links[link_type]
self._links[link_type][lhs_node.reserved_mid].remove(
rhs_node.reserved_mid
)
self._links_reverse[link_type][rhs_node.reserved_mid].remove(
lhs_node.reserved_mid
)
if remove_lhs_node:
self.remove_node_by_mid(lhs_node.reserved_mid)
if remove_rhs_node:
self.remove_node_by_mid(rhs_node.reserved_mid)
assert lhs_node in self._links[link_type]
if self.remove_node_validation is not None:
self.remove_node_validation.validate(self, link_type, lhs_node)

self._links[link_type][lhs_node].remove(rhs_node)
self._links_reverse[link_type][rhs_node].remove(lhs_node)

def delete_all_links(
self,
*,
link_type: LinkType,
lhs_node: Any,
):
assert link_type in self._links
assert lhs_node in self._links[link_type]
if self.remove_node_validation is not None:
self.remove_node_validation.validate(self, link_type, lhs_node)

existing_rhs_nodes = self._links[link_type][lhs_node]
for existing_rhs_node_ in existing_rhs_nodes:
self._links_reverse[link_type][existing_rhs_node_].remove(lhs_node)

del self._links[link_type][lhs_node]
Loading

0 comments on commit 2eeeb86

Please sign in to comment.