Skip to content

Commit

Permalink
Add ability to introspect slurm files
Browse files Browse the repository at this point in the history
  • Loading branch information
cokelaer committed Sep 12, 2024
1 parent cca6c17 commit c0301b9
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 37 deletions.
3 changes: 2 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

:Overview: A set of tools to help building or using Sequana pipelines
:Status: Production
:Issues: Please fill a report on `github <https://github.com/sequana/sequana/issues>`__
:Issues: Please fill a report on `github <https://github.com/sequana/sequana_pipetools/issues>`__
:Python version: Python 3.8, 3.9, 3.10, 3.11
:Citation: Cokelaer et al, (2017), ‘Sequana’: a Set of Snakemake NGS pipelines, Journal of Open Source Software, 2(16), 352, `JOSS DOI doi:10.21105/joss.00352 <http://www.doi2bib.org/bib/10.21105%2Fjoss.00352>`_

Expand Down Expand Up @@ -313,6 +313,7 @@ Changelog
========= ======================================================================
Version Description
========= ======================================================================
1.0.5 * introspect slurm files to extract stats
1.0.4 * add utility function to download and untar a tar.gz file
1.0.3 * add levenshtein function. some typo corrections.
1.0.2 * add the dot2png command. pin docutils <0.21 due to pip error
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
#maintainer ?#maintainer email
[tool.poetry]
name = "sequana_pipetools"
version = "1.0.4"
version = "1.0.5"
description = "A set of tools to help building or using Sequana pipelines"
authors = ["Sequana Team"]
license = "BSD-3"
Expand Down
10 changes: 2 additions & 8 deletions sequana_pipetools/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def download_and_extract_tar_gz(url, extract_to):
os.makedirs(extract_to, exist_ok=True)

# Download the file
logger.info(f"Downloading {filename}...")
logger.info(f"Downloaded {filename} to {file_path}")
response = requests.get(url, stream=True)
response.raise_for_status() # Raise an exception for HTTP errors
total_size = int(response.headers.get("content-length", 0))
Expand All @@ -60,21 +60,15 @@ def download_and_extract_tar_gz(url, extract_to):
file.write(chunk)
bar.update(len(chunk))

logger.info(f"Downloaded {filename} to {file_path}")

# Extract the tar.gz file
if tarfile.is_tarfile(file_path):
logger.info(f"Extracting {filename}...")

with tarfile.open(file_path, "r:gz") as tar:
tar.extractall(path=extract_to)
logger.info(f"Extracted to {extract_to}")
else:
logger.info(f"{file_path} is not a valid tar.gz file.")
logger.warning(f"{file_path} is not a valid tar.gz file.")

# Optionally, you can delete the .tar.gz file after extraction
os.remove(file_path)
logger.info("Process completed.")


def levenshtein_distance(token1: str, token2: str) -> int:
Expand Down
2 changes: 1 addition & 1 deletion sequana_pipetools/sequana_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def teardown(self, check_schema=True, check_input_files=True):
msg += "cd {}; sbatch {}.sh\n\n".format(self.workdir, self.name)
else:
msg += "cd {}; sh {}.sh\n\n".format(self.workdir, self.name)
msg += f"You may tune extra parameters related to snakemake in {self.workdir}/{self.name}/.sequana/profile_{self.options.profile}"
msg += f"You may tune extra parameters related to snakemake in {self.workdir}/.sequana/profile_{self.options.profile}"
print(self.colors.purple(msg))

# Save an info.txt with the command used
Expand Down
25 changes: 19 additions & 6 deletions sequana_pipetools/snaketools/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from sequana_pipetools import get_package_version
from sequana_pipetools.misc import PipetoolsException
from sequana_pipetools.snaketools.errors import PipeError
from sequana_pipetools.snaketools.slurm import SlurmStats

from .file_factory import FastQFactory, FileFactory
from .module import Pipeline
Expand Down Expand Up @@ -142,8 +143,10 @@ def onerror(self):
try:
p = PipeError(self.name)
p.status()
print(f"\nIf you encoutered an error using sequana_{self.name}, please copy paste the above message and create a New Issue on https://github.com/sequana/{self.name}/issues")
except Exception as err: #pragma: no cover
print(
f"\nIf you encoutered an error using sequana_{self.name}, please copy paste the above message and create a New Issue on https://github.com/sequana/{self.name}/issues"
)
except Exception as err: # pragma: no cover
print

def teardown(self, extra_dirs_to_remove=[], extra_files_to_remove=[], outdir="."):
Expand All @@ -154,10 +157,11 @@ def teardown(self, extra_dirs_to_remove=[], extra_files_to_remove=[], outdir="."
cleaner.add_makefile()

# create the version file given the requirements
if os.path.exists(".sequana/tools.txt"):
with open(".sequana/tools.txt", "r") as fin:

if os.path.exists(f"{outdir}/.sequana/tools.txt"):
with open(f"{outdir}/.sequana/tools.txt", "r") as fin:
deps = fin.readlines()
with open(".sequana/versions.txt", "w") as fout:
with open(f"{outdir}/.sequana/versions.txt", "w") as fout:
from versionix.parser import get_version

for dep in deps:
Expand All @@ -172,7 +176,16 @@ def teardown(self, extra_dirs_to_remove=[], extra_files_to_remove=[], outdir="."
print("\u2705 Another successful analysis. Open summary.html in your browser. Have fun.")
else:
print("\u2705 Another successful analysis. Have fun.")
print("\u2705 Please consider citing us would you use Sequana in your research. See https://sequana.readthedocs.io or cite: \n\n\tCokelaer et al. Sequana': a Set of Snakemake NGS pipelines, (2007) JOSS 2(16)")
print(
"\u2705 Please consider citing us would you use Sequana in your research. See https://sequana.readthedocs.io or cite: \n\n\tCokelaer et al. Sequana': a Set of Snakemake NGS pipelines, (2007) JOSS 2(16)"
)

# for HPC with slurm only
try:
slurm_stats = SlurmStats(outdir)
slurm_stats.to_csv(f"{outdir}.sequana/slurm_stats.txt")
except Exception:
pass

def get_html_summary(self, float="left", width=30):
import pandas as pd
Expand Down
111 changes: 98 additions & 13 deletions sequana_pipetools/snaketools/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,111 @@
# Documentation: http://sequana.readthedocs.io
# Contributors: https://github.com/sequana/sequana/graphs/contributors
##############################################################################
import re
import subprocess
from pathlib import Path

import colorlog
import parse

logger = colorlog.getLogger(__name__)

__all__ = ["SlurmStats", "SlurmParsing"]

class SlurmParsing:

class SlurmData:
def __init__(self, working_directory, logs_directory="logs", pattern="*/*slurm*.out"):

# get the master slurm file
main_slurms = list(Path(working_directory).glob("slurm-*"))

try:
self.master = sorted(main_slurms)[-1]
print(f"Found slurm master {self.master}")
except Exception as err:
self.master = None

log_dir = Path(working_directory) / logs_directory
self.slurms = sorted([f for f in log_dir.glob(pattern)])


# not tested because requires sacct command
class SlurmStats(SlurmData): # pragma: nocover
def __init__(self, working_directory, logs_directory="logs", pattern="*/*slurm*.out"):
super(SlurmStats, self).__init__(working_directory, logs_directory, pattern)

results = []
logger.info(f"Introspecting {len(self.slurms)} slurm files")
for filename in self.slurms:

ID = filename.name.split("-slurm-")[-1].replace(".out", "")
task = filename.name.split("-")[0]

# get slurm ID
cmd = f"sacct -j {ID} --format maxRSS,AllocCPUS,Elapsed,CPUTime"
call = subprocess.run(cmd.split(), stdout=subprocess.PIPE)
if call.returncode == 0:
jobinfo = self._parse_sacct_output(call.stdout.decode())
results.append([task] + jobinfo)
else:
print(cmd)

self.results = results
self.columns = ["task", "memory_gb", "thread", "time", "cpu_time"]

def to_csv(self, outfile):
with open(outfile, "w") as fout:
fout.write(",".join(self.columns))
for result in self.results:
fout.write(",".join([str(x) for x in result]))

def _parse_sacct_output(self, output):
"""Function to parse sacct output
The output is suppose to have 4 entries in this order:
MaxRSS AllocCPUS Elapsed CPUTime and solely used by :class:`~SlurmStats`
"""
# Split the output into lines and remove the header
lines = output.strip().split("\n")[2:]

# Initialize a list to store the values of interest
job_info = []

# Regex to match the values
value_regex = re.compile(r"(\S+)?\s+(\d+)\s+(\d{2}:\d{2}:\d{2})\s+(\d{2}:\d{2}:\d{2})")

for i, line in enumerate(lines):
match = value_regex.search(line)
if match:
# Extract values from regex groups
maxrss = match.group(1) if match.group(1) else "0K" # Handle empty MaxRSS case
alloccpus = int(match.group(2))
elapsed = match.group(3)
cputime = match.group(4)

# Only keep the second line (main job)
if i == 1:
# Convert MaxRSS from KB to GB
maxrss_gb = self._kb_to_gb(maxrss)
# Append parsed values to the job_info list
job_info = [maxrss_gb, alloccpus, elapsed, cputime]
break

# Return the list of job information
return job_info

def _kb_to_gb(self, kb_str):
# Remove the 'K' and convert to float
kb = float(kb_str.replace("K", ""))

# Convert kilobytes to gigabytes (1 GB = 1024^2 KB)
gb = kb / (1024**2)

return round(gb, 6) # Round to six decimal places for precision


class SlurmParsing(SlurmData):
"""Helper for sequana jobs debugging on slurm cluster.
Assumptions:
Expand Down Expand Up @@ -49,18 +145,7 @@ class SlurmParsing:
}

def __init__(self, working_directory, logs_directory="logs", pattern="*/*slurm*.out"):

# get the master slurm file
main_slurms = list(Path(working_directory).glob("slurm-*"))

try:
self.master = sorted(main_slurms)[-1]
print(f"Found slurm master {self.master}")
except Exception as err:
self.master = None

log_dir = Path(working_directory) / logs_directory
self.slurms = sorted([f for f in log_dir.glob(pattern)])
super(SlurmParsing, self).__init__(working_directory, logs_directory, pattern)

# no sys exit (even zero) since it is used within snakemake
N = len(self.slurms)
Expand Down
6 changes: 2 additions & 4 deletions tests/scripts/test_main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sys
import os
import sys

from click.testing import CliRunner

Expand Down Expand Up @@ -58,10 +58,8 @@ def test_slurm_diag():
assert results.exit_code == 0


def test_dot2png():
def test_dot2png(tmpdir):
runner = CliRunner()
dotfile = os.path.join(test_dir, "..", "data", "test_dag.dot")
results = runner.invoke(main, ["--dot2png", dotfile])
assert results.exit_code == 0


10 changes: 8 additions & 2 deletions tests/snaketools/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ def test_pipeline_manager(tmpdir):
pm = snaketools.PipelineManager("custom", cfg)
assert not pm.paired
working_dir = tmpdir.mkdir("temp")
pm.teardown(outdir=working_dir)

# when using the pipelie manager, it does not create .sequana, tools, etc
# this is created by the sequana_pipetools.sequana_manager
(working_dir / ".sequana").mkdir()
ff = Path(working_dir / ".sequana" / "tools.txt")
ff.touch()
ff = Path(working_dir / ".sequana" / "version.txt")
ff.touch()

# here not readtag provided, so data is considered to be non-fastq related
# or at least not paired
Expand Down Expand Up @@ -197,7 +204,6 @@ def test_directory():
pm = snaketools.pipeline_manager.PipelineManagerDirectory("test", cfg)



def test_pipeline_others():
cfg = SequanaConfig({})
file1 = os.path.join(test_dir, "data", "Hm2_GTGAAA_L005_R1_001.fastq.gz")
Expand Down
41 changes: 40 additions & 1 deletion tests/snaketools/test_slurm.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest

from sequana_pipetools.snaketools.slurm import SlurmParsing
from sequana_pipetools.snaketools.slurm import SlurmParsing, SlurmStats

from . import test_dir

Expand All @@ -19,3 +20,41 @@ def test_slurm():
print(dj)
dj
dj._report()


@pytest.fixture
def mock_sacct_output():
# This is the mock output for the sacct command, including the header
return """
MaxRSS AllocCPUS Elapsed CPUTime
---------- ---------- ---------- ----------
1 00:00:24 00:00:24
264364K 1 00:00:24 00:00:24
36K 1 00:00:24 00:00:24
"""


def test_parse_sacct_output(mock_sacct_output):
# Initialize SlurmStats object with dummy parameters
slurm_stats = SlurmStats(working_directory=".", logs_directory="logs")

# Test the _parse_sacct_output method
job_info = slurm_stats._parse_sacct_output(mock_sacct_output)

# Assert the parsing is correct
assert job_info == [0.252117, 1, "00:00:24", "00:00:24"]


@patch("subprocess.run")
def test_slurm_stats_with_mocked_sacct(mock_run, mock_sacct_output, tmpdir):
# Mock the subprocess.run method
mock_run.return_value = MagicMock(stdout=mock_sacct_output.encode("utf-8"), returncode=0)

# Initialize SlurmStats object with dummy parameters
slurm_stats = SlurmStats(working_directory=sharedir / "slurm_error1")

# Check if the result has been processed correctly
assert len(slurm_stats.results) == 9
assert slurm_stats.results[0]

slurm_stats.to_csv(str(tmpdir.join("test.csv")))

0 comments on commit c0301b9

Please sign in to comment.