Skip to content

Commit

Permalink
Merge pull request #21 from ayasyrev/dev
Browse files Browse the repository at this point in the history
0.0.7
  • Loading branch information
ayasyrev authored Nov 17, 2023
2 parents 0830abd + c294eb6 commit e903244
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 105 deletions.
14 changes: 10 additions & 4 deletions src/nbmetaclean/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
from pathlib import Path

from .clean import clean_nb_file
from .clean import CleanConfig, clean_nb_file
from .core import get_nb_names

parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -41,13 +41,19 @@ def app() -> None:
print(f"{path} not exists!")
if not cfg.silent:
print(f"notebooks to check: {len(nb_files)} ")
cleaned = clean_nb_file(
cleaned, errors = clean_nb_file(
nb_files,
silent=cfg.silent,
preserve_timestamp=not cfg.not_pt,
CleanConfig(
silent=cfg.silent,
preserve_timestamp=not cfg.not_pt,
),
)
if not cfg.silent:
print(f"cleaned nbs: {len(cleaned)}")
if errors:
print(f"with errors: {len(errors)}")
for nb, exc in errors:
print(f"{nb}: {exc}")


if __name__ == "__main__":
Expand Down
186 changes: 116 additions & 70 deletions src/nbmetaclean/clean.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,57 @@
from __future__ import annotations
import copy
from dataclasses import dataclass
import os

from pathlib import Path
from typing import Optional, Union
from typing import Iterable, Optional, Union

from nbmetaclean.core import read_nb, write_nb

from .core import read_nb, write_nb, PathOrStr
from .typing import NbNode, Metadata

NB_METADATA_PRESERVE_MASKS = [
NB_METADATA_PRESERVE_MASKS = (
("language_info", "name"),
]
("authors",),
)


@dataclass
class CleanConfig:
"""Clean config.
Args:
clear_nb_metadata (bool, optional): Clear notebook metadata. Defaults to True.
clear_cell_metadata (bool, optional): Clear cell metadata. Defaults to False.
clear_execution_count (bool, optional): Clear cell execution count. Defaults to True.
clear_outputs (bool, optional): Clear cell outputs. Defaults to False.
preserve_timestamp (bool, optional): Preserve timestamp. Defaults to True.
silent (bool, optional): Silent mode. Defaults to False.
nb_metadata_preserve_mask (Optional[tuple[str, ...]], optional):
Preserve mask for notebook metadata. Defaults to None.
cell_metadata_preserve_mask (Optional[tuple[str, ...]], optional):
Preserve mask for cell metadata. Defaults to None.
mask_merge (bool, optional): Merge masks. Add new mask to default.
If False - use new mask. Defaults to True.
"""

clear_nb_metadata: bool = True
clear_cell_metadata: bool = False
clear_execution_count: bool = True
clear_outputs: bool = False
preserve_timestamp: bool = True
silent: bool = False
nb_metadata_preserve_mask: Optional[Iterable[tuple[str, ...]]] = None
cell_metadata_preserve_mask: Optional[Iterable[tuple[str, ...]]] = None
mask_merge: bool = True


def filter_meta_mask(
nb_meta: Union[Metadata, str],
mask: [tuple[str, ...]] = None,
) -> Union[Metadata, str]:
nb_meta: Union[str, int, Metadata],
mask: Optional[Iterable[tuple[str, ...]]] = None,
) -> Union[str, int, Metadata]:
"""Filter metadata by mask. If no mask return empty dict."""
if isinstance(nb_meta, str) or mask == ():
if isinstance(nb_meta, (str, int)) or mask == ():
return nb_meta
if mask is None:
return {}
Expand All @@ -37,55 +70,65 @@ def filter_metadata(
"""Clean notebooknode metadata."""
if masks is None:
return {}
filtered_meta = {}
filtered_meta: Metadata = {}
for mask in masks:
filtered_meta.update(filter_meta_mask(nb_meta, mask))
filtered_meta.update(filter_meta_mask(nb_meta, mask)) # type: ignore
return filtered_meta


def clean_cell_metadata(
def clean_cell(
cell: NbNode,
clear_execution_count: bool = True,
clear_outputs: bool = False,
preserve_cell_metadata_mask: Optional[list[tuple[str, ...]]] = None,
cfg: CleanConfig,
) -> bool:
"""Clean cell metadata."""
"""Clean cell: optionally metadata, execution_count and outputs."""
changed = False
if metadata := cell.get("metadata", None):
old_metadata = copy.deepcopy(metadata)
cell["metadata"] = filter_metadata(metadata, preserve_cell_metadata_mask)
if cell["metadata"] != old_metadata:
changed = True
if clear_outputs and cell.get("outputs"):
cell["outputs"] = []
changed = True
if clear_execution_count and cell.get("execution_count"):

if cfg.clear_cell_metadata:
if metadata := cell.get("metadata", None):
old_metadata = copy.deepcopy(metadata)
cell["metadata"] = filter_metadata(
metadata, cfg.cell_metadata_preserve_mask
)
if cell["metadata"] != old_metadata:
changed = True

if cfg.clear_execution_count and cell.get("execution_count"):
cell["execution_count"] = None
changed = True
if outputs := cell.get("outputs"):
for output in outputs:
if clear_execution_count and output.get("execution_count", None):
output["execution_count"] = None

if cell.get("outputs"):
if cfg.clear_outputs:
cell["outputs"] = []
changed = True
elif cfg.clear_cell_metadata or cfg.clear_execution_count:
result = clean_outputs(cell["outputs"], cfg)
if result:
changed = True

return changed


def clean_outputs(outputs: list[NbNode], cfg: CleanConfig) -> bool:
"""Clean outputs."""
changed = False
for output in outputs:
if cfg.clear_execution_count and output.get("execution_count", None):
output["execution_count"] = None
changed = True
if cfg.clear_cell_metadata and (metadata := output.get("metadata", None)):
old_metadata = copy.deepcopy(metadata)
output["metadata"] = filter_metadata(
metadata, cfg.cell_metadata_preserve_mask
)
if output["metadata"] != old_metadata:
changed = True
if metadata := output.get("metadata", None):
old_metadata = copy.deepcopy(metadata)
output["metadata"] = filter_metadata(
metadata, preserve_cell_metadata_mask
)
if output["metadata"] != old_metadata:
changed = True
return changed


def clean_nb(
nb: NbNode,
clear_nb_metadata: bool = True,
clear_cell_metadata: bool = True,
clear_execution_count: bool = True,
clear_outputs: bool = False,
preserve_nb_metadata_masks: Optional[list[tuple[str, ...]],] = None,
preserve_cell_metadata_mask: Optional[str] = None,
) -> tuple[NbNode, bool]:
cfg: CleanConfig,
) -> bool:
"""Clean notebook - metadata, execution_count, outputs.
Args:
Expand All @@ -97,69 +140,72 @@ def clean_nb(
bool: True if changed.
"""
changed = False
if clear_nb_metadata and (metadata := nb.get("metadata")):
if cfg.clear_nb_metadata and (metadata := nb.get("metadata")):
old_metadata = copy.deepcopy(metadata)
masks = preserve_nb_metadata_masks or NB_METADATA_PRESERVE_MASKS
masks = NB_METADATA_PRESERVE_MASKS
if cfg.nb_metadata_preserve_mask:
if not cfg.mask_merge:
masks = cfg.nb_metadata_preserve_mask
else:
masks = cfg.nb_metadata_preserve_mask + masks

nb["metadata"] = filter_metadata(metadata, masks=masks)
if nb["metadata"] != old_metadata:
changed = True
if clear_cell_metadata:
if cfg.clear_cell_metadata or cfg.clear_execution_count or cfg.clear_outputs:
for cell in nb["cells"]:
result = clean_cell_metadata(
result = clean_cell(
cell,
clear_execution_count=clear_execution_count,
clear_outputs=clear_outputs,
preserve_cell_metadata_mask=preserve_cell_metadata_mask,
cfg,
)
if result:
changed = True

return nb, changed
return changed


def clean_nb_file(
path: Union[PathOrStr, list[PathOrStr]],
clear_nb_metadata: bool = True,
clear_cell_metadata: bool = True,
clear_execution_count: bool = True,
clear_outputs: bool = False,
preserve_timestamp: bool = True,
silent: bool = False,
) -> list[Path]:
path: Union[Path, list[Path]],
cfg: Optional[CleanConfig] = None,
) -> tuple[list[Path], list[tuple[Path, Exception]]]:
"""Clean metadata and execution count from notebook.
Args:
path (Union[str, PosixPath]): Notebook filename or list of names.
clear_nb_metadata (bool): Clear notebook metadata. Defaults to True.
clear_cell_metadata (bool): Clear cell metadata. Defaults to True.
clear_cell_metadata (bool): Clear cell metadata. Defaults to False.
clear_outputs (bool): Clear outputs. Defaults to False.
preserve_timestamp (bool): Preserve timestamp. Defaults to True.
clear_execution_count (bool, optional): Clean execution count. Defaults to True.
silent (bool): Silent mode. Defaults to False.
Returns:
List[Path]: List of cleaned notebooks
tuple[List[Path], List[TuplePath]]: List of cleaned notebooks, list of notebooks with errors.
"""
if cfg is None:
cfg = CleanConfig()
if not isinstance(path, list):
path = [path]
cleaned: list[Path] = []
errors: list[tuple[Path, Exception]] = []
to_clean = len(path)
for num, filename in enumerate(path):
nb = read_nb(filename)
nb, result = clean_nb(
try:
nb = read_nb(filename)
except Exception as ex:
errors.append((filename, ex))
continue
result = clean_nb(
nb,
clear_execution_count=clear_execution_count,
clear_outputs=clear_outputs,
clear_nb_metadata=clear_nb_metadata,
clear_cell_metadata=clear_cell_metadata,
cfg,
)
if result:
cleaned.append(filename)
if preserve_timestamp:
if cfg.preserve_timestamp:
stat = filename.stat()
write_nb(nb, filename)
if preserve_timestamp:
if cfg.preserve_timestamp:
os.utime(filename, (stat.st_atime, stat.st_mtime))
if not silent:
if not cfg.silent:
print(f"done {num + 1} of {to_clean}: {filename}")
return cleaned
return cleaned, errors
12 changes: 7 additions & 5 deletions src/nbmetaclean/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ def write_nb(
def get_nb_names(
path: Optional[PathOrStr] = None,
recursive: bool = True,
filter_hidden: bool = True,
hidden: bool = False,
) -> list[Path]:
"""Return list of notebooks from `path`. If no `path` return notebooks from current folder.
Args:
path (Union[Path, str, None]): Path for nb or folder with notebooks.
recursive bool: Recursive search.
filter_hidden bool: Filter hidden paths.
hidden bool: Skip or not hidden paths, defaults to False.
Raises:
sys.exit: If filename or dir not exists or not nb file.
Expand All @@ -75,14 +75,16 @@ def get_nb_names(
result = []
for item in nb_path.iterdir():
if item.is_file() and item.suffix == ".ipynb":
if filter_hidden and item.name.startswith("."):
if not hidden and item.name.startswith("."):
continue
result.append(item)
if item.is_dir():
if recursive:
if filter_hidden and item.name.startswith("."):
if not hidden and item.name.startswith("."):
continue
result.extend(get_nb_names(item, recursive, filter_hidden))
if "checkpoint" in item.name:
continue
result.extend(get_nb_names(item, recursive, hidden))

return result

Expand Down
2 changes: 1 addition & 1 deletion src/nbmetaclean/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.6" # pragma: no cover
__version__ = "0.0.7" # pragma: no cover
Loading

0 comments on commit e903244

Please sign in to comment.