From ceef6c1d1a651f1e1250665408ae43b912299679 Mon Sep 17 00:00:00 2001 From: Tharos Date: Fri, 8 Dec 2023 16:09:58 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Fixes=20on=20`load=5Fgraph`=20an?= =?UTF-8?q?d=20`save=5Fgraph`=20for=20JSON=20strings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pgGraphs/gfaparser.py | 29 ++++++++++++++++++++++++----- pgGraphs/graph.py | 38 ++++++++++++++++++++------------------ pyproject.toml | 2 +- setup.py | 2 +- 4 files changed, 46 insertions(+), 25 deletions(-) diff --git a/pgGraphs/gfaparser.py b/pgGraphs/gfaparser.py index fa753fe..58a3bd2 100644 --- a/pgGraphs/gfaparser.py +++ b/pgGraphs/gfaparser.py @@ -106,6 +106,25 @@ def get_gfa_type(tag_type: str) -> type | Callable: raise ValueError( f"Type identifier {tag_type} is not in the GFA standard") + @staticmethod + def set_gfa_type(tag_type: str) -> type | Callable: + """Interprets tags of GFA as a Python-compatible format + + Args: + tag_type (str): the letter that identifies the GFA data type + + Raises: + NotImplementedError: happens if its an array or byte array (needs doc) + ValueError: happens if format is not in GFA standards + + Returns: + type | Callable: the cast method or type to apply + """ + if tag_type == 'J': + return dumps + else: + return str + @staticmethod def get_python_type(data: object) -> str: """Interprets tags of GFA as a Python-compatible format @@ -224,16 +243,16 @@ def save_graph(graph, output_path: str) -> None: if graph.headers: for header in graph.headers: gfa_writer.write( - "H\t"+'\t'.join([f"{key}:{GFAParser.get_python_type(value)}:{value}" if not key.startswith('ARG') else str(value) for key, value in header.items()])+"\n") + "H\t"+'\t'.join([f"{key}:{GFAParser.get_python_type(value)}:{GFAParser.set_gfa_type(GFAParser.get_python_type(value))(value)}" if not key.startswith('ARG') else str(value) for key, value in header.items()])+"\n") if graph.segments: for segment_name, segment_datas in graph.segments.items(): gfa_writer.write("S\t"+f"{segment_name}\t{segment_datas['seq'] if 'seq' in segment_datas else 'N'*segment_datas['length']}\t" + '\t'.join( - [f"{key}:{GFAParser.get_python_type(value)}:{value}" if not key.startswith('ARG') else str(value) for key, value in segment_datas.items() if key not in ['length', 'seq']])+"\n") + [f"{key}:{GFAParser.get_python_type(value)}:{GFAParser.set_gfa_type(GFAParser.get_python_type(value))(value)}" if not key.startswith('ARG') else str(value) for key, value in segment_datas.items() if key not in ['length', 'seq']])+"\n") if graph.lines: - for line in graph.lines: + for line in graph.lines.values(): ori1, ori2 = line['orientation'].split('/') gfa_writer.write(f"L\t"+f"{line['start']}\t{ori1}\t{line['end']}\t{ori2}\t" + '\t'.join( - [f"{key}:{GFAParser.get_python_type(value)}:{value}" if not key.startswith('ARG') else str(value) for key, value in line.items() if key not in ['orientation', 'start', 'end']])+"\n") + [f"{key}:{GFAParser.get_python_type(value)}:{GFAParser.set_gfa_type(GFAParser.get_python_type(value))(value)}" if not key.startswith('ARG') else str(value) for key, value in line.items() if key not in ['orientation', 'start', 'end']])+"\n") if graph.paths: for path_name, path_datas in graph.paths.items(): if graph.metadata['version'] == GFAFormat.GFA1: # P-line @@ -245,5 +264,5 @@ def save_graph(graph, output_path: str) -> None: offset_stop: int | str = path_datas['stop_offset'] if 'stop_offset' in path_datas else '?' strpath: str = ''.join( [f"{'>' if orient == Orientation.FORWARD else '<'}{node_name}" for node_name, orient in path_datas['path']]) - return f"W\t{path_name}\t{path_datas['origin'] if 'origin' in path_datas else line_number}\t{path_datas['name']}\t{offset_start}\t{offset_stop}\t{strpath}\t*\n" + return f"W\t{path_name}\t{path_datas['origin'] if 'origin' in path_datas else line_number}\t{path_name}\t{offset_start}\t{offset_stop}\t{strpath}\t*\n" line_number += 1 diff --git a/pgGraphs/graph.py b/pgGraphs/graph.py index f40407b..c14c9dd 100644 --- a/pgGraphs/graph.py +++ b/pgGraphs/graph.py @@ -44,7 +44,7 @@ def __init__(self, gfa_file: str | None = None, with_sequence: bool = True) -> N ) name, line_type, datas = GFAParser.read_gfa_line( - gfa_line.split(), with_sequence) + gfa_line.split('\t'), with_sequence) match line_type: case GFALine.SEGMENT: self.segments[name] = datas @@ -340,7 +340,7 @@ def merge_segments( ############### POsitionnal tag ############### - def sequence_offsets(self) -> None: + def sequence_offsets(self, recalculate: bool = False) -> None: """ Calculates the offsets within each path for each node Here, we aim to extend the current GFA tag format by adding tags @@ -353,19 +353,21 @@ def sequence_offsets(self) -> None: Note that any non-referenced walk in this field means that the node is not inside the given walk. """ - for walk_name, walk_datas in self.paths.items(): - start_offset: int = int( - walk_datas['start_offset']) if 'start_offset' in walk_datas.keys() else 0 - for node, vect in walk_datas["path"]: - if 'PO' not in self.segments[node]: - self.segments[node]['PO']: dict[str, - list[tuple[int, int, Orientation]]] = dict() - if walk_name in self.segments[node]['PO']: - # We already encountered the node in this path - self.segments[node]['PO'][walk_name].append( - (start_offset, start_offset+self.segments[node]['length'], vect.value)) - else: - # First time we encounter this node for this path - self.segments[node]['PO'][walk_name] = [ - (start_offset, start_offset+self.segments[node]['length'], vect.value)] - start_offset += self.segments[node]['length'] + if not 'PO' in self.metadata or recalculate: + for walk_name, walk_datas in self.paths.items(): + start_offset: int = int( + walk_datas['start_offset']) if 'start_offset' in walk_datas.keys() and isinstance(walk_datas['start_offset'], int) is None else 0 + for node, vect in walk_datas["path"]: + if 'PO' not in self.segments[node]: + self.segments[node]['PO']: dict[str, + list[tuple[int, int, Orientation]]] = dict() + if walk_name in self.segments[node]['PO']: + # We already encountered the node in this path + self.segments[node]['PO'][walk_name].append( + (start_offset, start_offset+self.segments[node]['length'], vect.value)) + else: + # First time we encounter this node for this path + self.segments[node]['PO'][walk_name] = [ + (start_offset, start_offset+self.segments[node]['length'], vect.value)] + start_offset += self.segments[node]['length'] + self.metadata['PO'] = True diff --git a/pyproject.toml b/pyproject.toml index 0214c67..6c044c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ [project] name = "gfagraphs" - version = "0.2.2" + version = "0.2.9" authors = [ { name="Siegfried Dubois", email="siegfried.dubois@inria.fr" }, ] diff --git a/setup.py b/setup.py index 44d5561..7923adc 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ DESCRIPTION: str = "Library to parse, edit and handle in memory GFA graphs" REQUIRED_PYTHON: tuple = (3, 10) OVERRIDE_VN: bool = True -VN: str = "0.2.2" +VN: str = "0.2.9" URL: str = "https://github.com/Tharos-ux/gfagraphs" REQUIREMENTS: list[str] = ['networkx', 'tharos-pytools']