Skip to content

Commit

Permalink
Lib structure update
Browse files Browse the repository at this point in the history
  • Loading branch information
dubssieg committed Nov 28, 2023
1 parent 72b9825 commit 17c96f2
Show file tree
Hide file tree
Showing 7 changed files with 832 additions and 4 deletions.
78 changes: 74 additions & 4 deletions gfagraphs/gfagraphs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"Tools to represent GFA format"
from os.path import exists
from os import stat
from os import path, stat
from enum import Enum
from re import sub, match
from typing import Callable
Expand All @@ -11,6 +10,60 @@
from tharospytools.matplotlib_tools import get_palette


def get_gfa_subtype(gfa_file_path: str | list[str]) -> str | list[str]:
"""Given a file, or more, returns the gfa subtypes, and raises error if file is invalid or does not exists
Args:
gfa_file_path (str | list[str]): one or more file paths
Returns:
str | list[str]: a gfa subtype descriptor per input file
"""
styles: list[str] = list()
if isinstance(gfa_file_path, str):
gfa_file_path = [gfa_file_path]
for gfa_file in gfa_file_path:
# Checking if path exists
if not path.exists(gfa_file):
raise OSError(
"Specified file does not exists. Please check provided path."
)
# Checking if file descriptor is valid
if not gfa_file.endswith('.gfa'):
raise IOError(
"File descriptor is invalid. Please check format, this lib is designed to work with Graphical Fragment Assembly (GFA) files."
)
# Checking if file is not empty
if stat(gfa_file).st_size == 0:
raise IOError(
"File is empty."
)
with open(gfa_file, 'r', encoding='utf-8') as gfa_reader:
header: str = gfa_reader.readline()
if header[0] != 'H':
styles.append('rGFA')
else:
try:
version_number: str = supplementary_datas(
header.strip('\n').split('\t'), 1
)["VN"]
if version_number == '1.0':
styles.append('GFA1')
elif version_number == '1.1':
styles.append('GFA1.1')
elif version_number == '1.2':
styles.append('GFA1.2')
elif version_number == '2.0':
styles.append('GFA2')
else:
styles.append('unknown')
except KeyError:
styles.append('rGFA')
if len(styles) == 1:
return styles[0]
return styles


def gtype(tag_type: str) -> type | Callable:
"""Interprets tags of GFA as a Python-compatible format
Expand Down Expand Up @@ -413,7 +466,7 @@ def __init__(self, gfa_file: str | None = None, gfa_type: str = 'unknown', with_
if gfa_file:
# We try to load file from disk
# Checking if path exists
if not exists(gfa_file):
if not path.exists(gfa_file):
raise OSError(
"Specified file does not exists. Please check provided path."
)
Expand All @@ -427,6 +480,7 @@ def __init__(self, gfa_file: str | None = None, gfa_type: str = 'unknown', with_
raise IOError(
"File is empty."
)

# All lines shall start by a captial letter (see GFAspec). If not, we raise ValueError
with open(gfa_file, 'r', encoding='utf-8') as gfa_reader:
for gfa_line in gfa_reader:
Expand All @@ -438,14 +492,30 @@ def __init__(self, gfa_file: str | None = None, gfa_type: str = 'unknown', with_
# We parse the GFA line with the record class
record: Record = Record(
gfa_line,
gfa_type,
self.version.value,
{
'ws': with_sequence
}
)
# We put record in the right list
if isinstance(record, Header):
self.headers.append(record)
try:
version_number: str = supplementary_datas(
gfa_line.strip('\n').split('\t'), 1
)["VN"]
if version_number == '1.0':
self.version = GfaStyle('GFA1')
elif version_number == '1.1':
self.version = GfaStyle('GFA1.1')
elif version_number == '1.2':
self.version = GfaStyle('GFA1.2')
elif version_number == '2.0':
self.version = GfaStyle('GFA2')
else:
self.version = GfaStyle('unknown')
except KeyError:
self.version = GfaStyle('rGFA')
elif isinstance(record, Segment):
self.segments.append(record)
elif isinstance(record, Line):
Expand Down
6 changes: 6 additions & 0 deletions pgGraphs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
'Abstraction layer for GFA format'
from .abstractions import GFALine, GFAFormat, Orientation
from .gfaparser import GFAParser
from .graph import Graph
from .io import GFAIO
from .networkx import GFANetwork
29 changes: 29 additions & 0 deletions pgGraphs/abstractions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"Abstractions over GFA formats"
from enum import Enum, auto


class Orientation(Enum):
"Describes the way a node is read"
FORWARD = '+'
REVERSE = '-'
ANY = '?' | auto()


class GFAFormat(Enum):
"Describes the different possible gfa-like formats"
RGFA = 'rGFA'
GFA1 = 'GFA1'
GFA1_1 = 'GFA1.1'
GFA1_2 = 'GFA1.2'
GFA2 = 'GFA2'
ANY = 'unknown' | auto()


class GFALine(Enum):
"Describes the different GFA line formats"
SEGMENT = 'S'
LINE = 'L'
WALK = 'W'
PATH = 'P'
HEADER = 'H'
ANY = auto()
210 changes: 210 additions & 0 deletions pgGraphs/gfaparser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
""
from re import match, sub
from typing import Callable
from json import loads, dumps
from os import path, stat
from abstractions import Orientation, GFALine


class GFAParser:
"""This class implements static methods to get informations about the contents of a GFA file, and to parse them.
Raises:
OSError: _description_
IOError: _description_
IOError: _description_
NotImplementedError: _description_
ValueError: _description_
ValueError: _description_
"""

@staticmethod
def get_gfa_format(gfa_file_path: str | list[str]) -> str | list[str]:
"""Given a file, or more, returns the gfa subtypes, and raises error if file is invalid or does not exists
Args:
gfa_file_path (str | list[str]): one or more file paths
Returns:
str | list[str]: a gfa subtype descriptor per input file
Raises:
OSError: The file does not exists
IOError: The file descriptor is invalid
IOError: The file is empty
"""
styles: list[str] = list()
if isinstance(gfa_file_path, str):
gfa_file_path = [gfa_file_path]
for gfa_file in gfa_file_path:
# Checking if path exists
if not path.exists(gfa_file):
raise OSError(
"Specified file does not exists. Please check provided path."
)
# Checking if file descriptor is valid
if not gfa_file.endswith('.gfa'):
raise IOError(
"File descriptor is invalid. Please check format, this lib is designed to work with Graphical Fragment Assembly (GFA) files."
)
# Checking if file is not empty
if stat(gfa_file).st_size == 0:
raise IOError(
"File is empty."
)
with open(gfa_file, 'r', encoding='utf-8') as gfa_reader:
header: str = gfa_reader.readline()
if header[0] != 'H':
styles.append('rGFA')
else:
try:
version_number: str = GFAParser.supplementary_datas(
header.strip('\n').split('\t'), 1
)["VN"]
if version_number == '1.0':
styles.append('GFA1')
elif version_number == '1.1':
styles.append('GFA1.1')
elif version_number == '1.2':
styles.append('GFA1.2')
elif version_number == '2.0':
styles.append('GFA2')
else:
styles.append('unknown')
except KeyError:
styles.append('rGFA')
if len(styles) == 1:
return styles[0]
return styles

@staticmethod
def get_gfa_type(tag_type: str) -> type | Callable:
"""Interprets tags of GFA as a Python-compatible format
Args:
tag_type (str): the letter that identifies the GFA data type
Raises:
NotImplementedError: happens if its an array or byte array (needs doc)
ValueError: happens if format is not in GFA standards
Returns:
type | Callable: the cast method or type to apply
"""
if tag_type == 'i':
return int
elif tag_type == 'f':
return float
elif tag_type == 'A' or tag_type == 'Z':
return str
elif tag_type == 'J':
return loads
elif tag_type == 'H' or tag_type == 'B':
raise NotImplementedError()
raise ValueError(
f"Type identifier {tag_type} is not in the GFA standard")

@staticmethod
def get_python_type(data: object) -> str:
"""Interprets tags of GFA as a Python-compatible format
Args:
tag_type (str): the letter that identifies the GFA data type
Raises:
NotImplementedError: happens if its an array or byte array (needs doc)
ValueError: happens if format is not in GFA standards
Returns:
type | Callable: the cast method or type to apply
"""
if isinstance(data, int):
return 'i'
elif isinstance(data, float):
return 'f'
elif isinstance(data, str):
return 'Z'
else:
try:
_: str = dumps(data, indent=0, separators=(',', ':'))
return 'J'
except (TypeError, OverflowError) as exc:
raise ValueError(
f"Type {type(data)} is not in the GFA standard") from exc

@staticmethod
def supplementary_datas(datas: list, length_condition: int) -> dict:
"""Computes the optional tags of a gfa line and returns them as a dict
Args:
datas (list): parsed data line
length_condition (int): last position of positional field
Returns:
dict: mapping tag:value
"""
mapping: dict = dict()
nargs: int = length_condition
if len(datas) > length_condition: # we happen to have additional tags to our line
for additional_tag in datas[length_condition:]:
if match('[A-Z]{2}:[a-zA-Z]{1}:', additional_tag): # matches start of the line
mapping[additional_tag[:2]] = GFAParser.get_gfa_type(
additional_tag[3])(additional_tag[5:])
else:
mapping[f"ARG{nargs}"] = additional_tag
nargs += 1
return mapping

@staticmethod
def read_gfa_line(datas: list[str], load_sequence_in_memory: bool = True) -> tuple[str, GFALine, dict]:
"""Calls methods to parse a GFA line,
accordingly to it's fields described in the GFAspec github.
Args:
datas (list): a list of the fileds in the line
load_sequence_in_memory (bool): if the line is a segment, ask to load its sequence
Returns:
tuple[str, GFALine, dict]: datas of the line in Python-compatible formats.
"""
line_datas: dict = dict()
match (line_type := GFALine(datas[0])):
case GFALine.SEGMENT:
line_datas["length"] = len(datas[2])
if load_sequence_in_memory:
line_datas["seq"] = datas[2]
return (sub('\D', '', datas[1]), line_type, {**line_datas, **GFAParser.supplementary_datas(datas, 3)})
case GFALine.LINE:
line_datas["start"] = sub('\D', '', datas[1])
line_datas["end"] = sub('\D', '', datas[3])
line_datas["orientation"] = f"{datas[2]}/{datas[4]}"
return ((line_datas['start'], line_datas['end']), line_type, {**line_datas, **GFAParser.supplementary_datas(datas, 5)})
case GFALine.WALK:
line_datas["id"] = datas[3]
line_datas["origin"] = int(datas[2])
line_datas["start_offset"] = datas[4]
line_datas["stop_offset"] = datas[5]
line_datas["path"] = [
(
node[1:],
Orientation(node[0])
)
for node in datas[6].replace('>', ',+').replace('<', ',-')[1:].split(',')
]
return (datas[1], line_type, {**line_datas, **GFAParser.supplementary_datas(datas, 7)})
case GFALine.PATH:
line_datas["id"] = datas[1]
line_datas["origin"] = None
line_datas["start_offset"] = None
line_datas["stop_offset"] = None
line_datas["path"] = [
(
node[:-1],
Orientation(node[-1])
)
for node in datas[2].split(',')
]
return (datas[1], line_type, {**line_datas, **GFAParser.supplementary_datas(datas, 7)})
case GFALine.HEADER | GFALine.ANY:
return (None, line_type, GFAParser.supplementary_datas(datas, 1))
Loading

0 comments on commit 17c96f2

Please sign in to comment.