diff --git a/.github/workflows/run-unittest.yml b/.github/workflows/run-unittest.yml index 1da2119..706762d 100644 --- a/.github/workflows/run-unittest.yml +++ b/.github/workflows/run-unittest.yml @@ -5,7 +5,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.11, 3.12] + python-version: [3.11, 3.12, 3.13] steps: - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} @@ -15,4 +15,4 @@ jobs: - name: Install dependencies run: pip install -e . - name: Run tests - run: python3 tests.py \ No newline at end of file + run: python3 -m unittest discover tests diff --git a/README.md b/README.md index b30355a..12f48d1 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,10 @@ +

Deduplidog – Deduplicator that covers your back.

+

+ +

+ [![Build Status](https://github.com/CZ-NIC/deduplidog/actions/workflows/run-unittest.yml/badge.svg)](https://github.com/CZ-NIC/deduplidog/actions) -Yet another file deduplicator. - [About](#about) * [What are the use cases?](#what-are-the-use-cases) @@ -18,9 +22,9 @@ Yet another file deduplicator. # About ## What are the use cases? -* I have downloaded photos and videos from the cloud. Oh, both Google Photos and Youtube shrink the file and changes the format. Moreover, it have shortened the file name to 47 characters and capitalize the extension. So how should I know that I have them all backed up offline? +* I have downloaded photos and videos from the cloud. Oh, both Google Photos and Youtube *shrink the files* and change their format. Moreover, they shorten the file names to 47 characters and capitalize the extensions. So how am I supposed to know if I have everything backed up offline when the copies are resized? * My disk is cluttered with several backups and I'd like to be sure these are all just copies. -* I merge data from multiple sources. Some files in the backup might have the former orignal file modification date that I might wish to restore. +* I merge data from multiple sources. Some files in the backup might have *the former orignal file modification date* that I might wish to restore. ## What is compared? @@ -53,19 +57,30 @@ The program does not write anything to the disk, unless `execute=True` is set. F Install with `pip install deduplidog`. -It works as a standalone program with both CLI and TUI interfaces. Just launch the `deduplidog` command. -Moreover, it works best when imported from a [Jupyter Notebook](https://jupyter.org/). +It works as a standalone program with all the CLI, TUI and GUI interfaces. Just launch the `deduplidog` command. # Examples +## Media magic confirmation + +Let's compare two folders. + +```bash +deduplidog --work-dir folder1 --original-dir folder2 --media-magic --rename --execute +``` + +By default, `--confirm-one-by-one` is True, causing every change to be manually confirmed before it takes effect. So even though `--execute` is there, no change happen without confirmation. The change that happen is the `--rename`, the file in the `--work-dir` will be prefixed with the `✓` character. The `--media-magic` mode considers an image a duplicate if it has the same name and a similar image hash, even if the files are of different sizes. + +![Confirmation](asset/warnings_confirmation_example.avif "Confirmation, including warnings") + +Note that the default button is 'No' as there are some warnings. First, the file in the folder we search for duplicates in is bigger than the one in the original folder. Second, it is also older, suggesting that it might be the actual original. + + ## Duplicated files Let's take a closer look to a use-case. -```python3 -import logging -from deduplidog import Deduplidog - -Deduplidog("/home/user/duplicates", "/media/disk/origs", ignore_date=True, rename=True) +```bash +deduplidog --work-dir /home/user/duplicates --original-dir /media/disk/origs" --ignore-date --rename ``` This command produced the following output: @@ -85,9 +100,8 @@ Warnings: 1 We found out all the files in the *duplicates* folder seem to be useless but one. It's date is earlier than the original one. The life buoy icon would prevent any action. To suppress this, let's turn on `set_both_to_older_date`. See with full log. -```python3 -Deduplidog("/home/user/duplicates", "/media/disk/origs", - ignore_date=True, rename=True, set_both_to_older_date=True, log_level=logging.INFO) +```bash +deduplidog --work-dir /home/user/duplicates --original-dir /media/disk/origs --ignore-date --rename --set-both-to-older-date --log-level=10 ``` ``` @@ -112,9 +126,8 @@ Affected size: 59.9 kB You see, the log is at the most brief, yet transparent form. The files to be affected at the work folder are prepended with the 🔨 icon whereas those affected at the original folder uses 📄 icon. We might add `execute=True` parameter to perform the actions. Or use `inspect=True` to inspect. -```python3 -Deduplidog("/home/user/duplicates", "/media/disk/origs", - ignore_date=True, rename=True, set_both_to_older_date=True, inspect=True) +```bash +deduplidog --work-dir /home/user/duplicates --original-dir /media/disk/origs --ignore-date --rename --set-both-to-older-date --inspect ``` The `inspect=True` just produces the commands we might subsequently use. @@ -133,7 +146,7 @@ You face a directory that might contain some images twice. Let's analyze. We tur ``` $ deduplidog --work-dir ~/shuffled/ --media-magic --ignore-name --skip-bigger --log-level=20 Only files with media suffixes are taken into consideration. Nor the size nor the date is compared. Nor the name! -Duplicates from the work dir at 'shuffled' (only if smaller than the pair file) would be (if execute were True) left intact (because no action is selected). +Duplicates from the work dir at 'shuffled' (only if smaller than the pair file) would be (if execute were True) left intact (because no action is selected, nothing will happen). Number of originals: 9 Caching image hashes: 100%|███████████████████████████████████████████████████████████████████████████████████████████████| 9/9 [00:00<00:00, 16.63it/s] @@ -197,7 +210,15 @@ Find the duplicates. Normally, the file must have the same size, date and name. | output | bool | False | Stores the output log to a file in the current working directory. (Never overwrites an older file.) | ## Utils -In the `deduplidog.utils` packages, you'll find a several handsome tools to help you. You will find parameters by using you IDE hints. + +The library might be invoked from a [Jupyter Notebook](https://jupyter.org/). + +```python3 +from deduplidog import Deduplidog +Deduplidog("/home/user/duplicates", "/media/disk/origs", ignore_date=True, rename=True).start() +``` + +In the `deduplidog.utils` packages, you'll find a several handsome tools to help you. You will find parameters by using your IDE hints. ### `images` *`urls: Iterable[str | Path]`* Display a ribbon of images. diff --git a/asset/logo.jpg b/asset/logo.jpg new file mode 100644 index 0000000..5042e16 Binary files /dev/null and b/asset/logo.jpg differ diff --git a/asset/warnings_confirmation_example.avif b/asset/warnings_confirmation_example.avif new file mode 100644 index 0000000..002d00e Binary files /dev/null and b/asset/warnings_confirmation_example.avif differ diff --git a/deduplidog/__main__.py b/deduplidog/__main__.py index 4696716..cf9674f 100644 --- a/deduplidog/__main__.py +++ b/deduplidog/__main__.py @@ -1,16 +1,14 @@ import sys -from mininterface import run +from mininterface import Cancelled, run from .deduplidog import Deduplidog def main(): + # NOTE: I'd like to have the default in case work dir is not specified args=("--work-dir", str(Path.cwd())) + # Currently, args overthrows CLI arguments. with run(Deduplidog, interface=None) as m: - # with run(Deduplidog, interface="tui") as m: - # m = run(Deduplidog, interface="gui") - # if 1: - # m.facet._layout # TODO try: while True: print("") @@ -23,6 +21,8 @@ def main(): # deduplidog.perform() # else: m.env.start(m) + except Cancelled: + continue except Exception as e: print("-"*100) print(e) diff --git a/deduplidog/cli.py b/deduplidog/cli.py deleted file mode 100644 index fabe530..0000000 --- a/deduplidog/cli.py +++ /dev/null @@ -1,15 +0,0 @@ -import click -from dataclass_click import dataclass_click - -from .deduplidog import Deduplidog - - -class RaiseOnMissingParam(click.Command): - def __call__(self, *args, **kwargs): - return super(RaiseOnMissingParam, self).__call__(*args, standalone_mode=False, **kwargs) - - -@click.command(cls=RaiseOnMissingParam) -@dataclass_click(Deduplidog) -def cli(dd: Deduplidog): - return dd diff --git a/deduplidog/deduplidog.py b/deduplidog/deduplidog.py index 98d18aa..f55cd49 100644 --- a/deduplidog/deduplidog.py +++ b/deduplidog/deduplidog.py @@ -171,7 +171,7 @@ class Deduplidog: media: OmitArgPrefixes[Media] helper: OmitArgPrefixes[Helper] - work_dir: Path + work_dir: Path = Path.cwd() """Folder of the files suspectible to be duplicates.""" original_dir: Path | None = None @@ -255,6 +255,7 @@ def start(self, interface=None): self.reset() self.check() self.perform() + return self def perform(self): # build file list of the originals @@ -406,7 +407,7 @@ def check(self): def _get_action(self, passive=False): action = self.action.rename, self.action.replace_with_original, self.action.delete, self.action.replace_with_symlink if not sum(action): - return f"{'left' if passive else 'leave'} intact (because no action is selected)" + return f"{'left' if passive else 'leave'} intact (because no action is selected, nothing will happen)" elif sum(action) > 1: raise AssertionError("Choose only one execute action (like only rename).") elif self.action.rename: @@ -697,9 +698,9 @@ def _find_similar(self, work_file: Path, candidates: list[Path]): for original in candidates: ost, wst = original.stat(), work_file.stat() if (self.match.ignore_date - or wst.st_mtime == ost.st_mtime - or self.match.tolerate_hour and self.match.tolerate_hour[0] <= (wst.st_mtime - ost.st_mtime)/3600 <= self.match.tolerate_hour[1] - ) and (self.match.ignore_size or wst.st_size == ost.st_size and (not self.match.checksum or crc(original) == crc(work_file))): + or wst.st_mtime == ost.st_mtime + or self.match.tolerate_hour and self.match.tolerate_hour[0] <= (wst.st_mtime - ost.st_mtime)/3600 <= self.match.tolerate_hour[1] + ) and (self.match.ignore_size or wst.st_size == ost.st_size and (not self.match.checksum or crc(original) == crc(work_file))): return original def _find_similar_media(self, work_file: Path, comparing_image: bool, candidates: list[Path]): diff --git a/deduplidog/form.tcss b/deduplidog/form.tcss deleted file mode 100644 index d1b0929..0000000 --- a/deduplidog/form.tcss +++ /dev/null @@ -1,10 +0,0 @@ -Screen { - align: center middle; -} - -VerticalScroll { - width: auto; - height: auto; - background: $boost; - padding: 2; -} diff --git a/deduplidog/tui.py b/deduplidog/tui.py deleted file mode 100644 index b8ccd33..0000000 --- a/deduplidog/tui.py +++ /dev/null @@ -1,65 +0,0 @@ -from dataclasses import dataclass, field - -from textual import events -from textual.app import App, ComposeResult -from textual.containers import VerticalScroll -from textual.widgets import Checkbox, Footer, Input, Label - - -@dataclass -class TuiState: - INPUTS: list = field(default_factory=list) - FOCUSED_I: int = 0 - - -tui_state = TuiState() - - -class CheckboxApp(App[None]): - CSS_PATH = "form.tcss" - - BINDINGS = [ - ("up", "go_up", "Go up"), - ("down", "go_up", "Go down"), - ("ctrl+s", "confirm", "Run"), # ctrl/alt+enter does not work; enter does not work with checkboxes - ("escape", "exit", "Exit"), - ] - - def compose(self) -> ComposeResult: - yield Footer() - self.inputs = tui_state.INPUTS - with VerticalScroll(): - for input in self.inputs: - if isinstance(input, Input): - yield Label(input.placeholder) - yield input - yield Label(input._link.help) - yield Label("") - - def on_mount(self): - self.inputs[tui_state.FOCUSED_I].focus() - - def action_confirm(self): - # next time, start on the same widget - tui_state.FOCUSED_I = next((i for i, inp in enumerate(self.inputs) if inp == self.focused), None) - self.exit(True) - - def action_exit(self): - self.exit() - - def on_key(self, event: events.Key) -> None: - try: - index = self.inputs.index(self.focused) - except ValueError: # probably some other element were focused - return - match event.key: - case "down": - self.inputs[(index + 1) % len(self.inputs)].focus() - case "up": - self.inputs[(index - 1) % len(self.inputs)].focus() - case letter if len(letter) == 1: # navigate by letters - for inp_ in self.inputs[index+1:] + self.inputs[:index]: - label = inp_.label if isinstance(inp_, Checkbox) else inp_.placeholder - if str(label).casefold().startswith(letter): - inp_.focus() - break diff --git a/tests.py b/tests.py deleted file mode 100644 index 684fa87..0000000 --- a/tests.py +++ /dev/null @@ -1,184 +0,0 @@ - -from collections.abc import Mapping -from dataclasses import dataclass -from itertools import chain -import os -from pathlib import Path -from tempfile import TemporaryDirectory, mkdtemp -from typing import Self -from unittest import TestCase, main -import random -import string - -from deduplidog import Deduplidog -from deduplidog.deduplidog import Action, Execution, Match, Media, Helper - - -@dataclass -class FileRepresentation: - path: Path - mtime: int = 0 - "relative mtime" - text_seed: int = 1 - - def __post_init__(self): - self._mtime = round(self.path.parent.parent.stat().st_mtime + self.mtime) - - def write(self): - "Writes the representation to the disk." - self.path.write_text(self.get_text()) - os.utime(self.path, (self._mtime,)*2) - return self - - def check(self, test: TestCase): - "Checks the disk whether it contains the file represented." - test.assertTrue(self.path.exists(), msg=self.path) - test.assertEqual(self.get_text(), self.path.read_text(), msg=self.path) - test.assertEqual(self._mtime, self.path.stat().st_mtime, msg=self.path) - - def get_text(self): - random.seed(self.text_seed) - return ''.join(random.choices(string.ascii_letters + string.digits, k=10+self.text_seed*10)) - - def prefixed(self): - self.path = self.path.with_name("✓" + self.path.name) - - def suck(self, other: Self): - "Use the other file. Use its name, however stays in the current directory." - self.path = self.path.with_name(other.path.name) - self._mtime = other._mtime - self.text_seed = other.text_seed - - -@dataclass -class FolderState(Mapping): - test_case: TestCase - _work_dir: Path - _original_dir: Path - work_files: dict[str, FileRepresentation] - originals: dict[str, FileRepresentation] - - def __iter__(self): - yield from ('work_dir', 'original_dir') - - def __len__(self): - return 2 - - def __getitem__(self, key): - if key == 'work_dir': - return self._work_dir - elif key == 'original_dir': - return self._original_dir - else: - raise KeyError(key) - - def check(self, prefixed: tuple[int] = None, suck: tuple[int] = None): - """Checks the file changes - - :param prefixed: These files in the work dir are expected to be prefixed - :param suck: These files in the work dir are expected to be sucked from the originals - """ - [self.work_files[f"file_{i}"].prefixed() for i in prefixed or ()] - [self.work_files[f"file_{i}"].suck(self.originals[f"file_{i}"]) for i in suck or ()] - [f.check(self.test_case) for f in chain(self.work_files.values(), self.originals.values())] - - -def drun(action=None, execution=None, match=None, media=None, helper=None, **kw): - def _(l: list | dict): - if isinstance(l, list): - return {k: True for k in l} - return l - return Deduplidog(Action(**_(action or [])), - Execution(**_(execution or [])), - Match(**_(match or [])), - Media(**_(media or [])), - Helper(**_(helper or [])), - **kw).start() - - -class TestDeduplidog(TestCase): - - def prepare(self, testing_dir: str = None): - self.temp = mkdtemp() # TemporaryDirectory() NOTE - # temp = Path(testing_dir) if testing_dir else self.temp.name NOTE - temp = str(self.temp) - originals = Path(temp, "originals") - work_dir = Path(temp, "work_dir") - if not testing_dir: - originals.mkdir() - work_dir.mkdir() - - original_files = {name: FileRepresentation(originals / name).write() - for name in (f"file_{i}" for i in range(12))} - work_files = {name: FileRepresentation(work_dir / name, *rest).write() for name, *rest in ( - ("file_1", 0, 2), - ("file_2", 0, 3), - ("file_4", 3600), - ("file_5", 7200), - ("file_6", 3601), - ("file_7", 3599), - ("file_8", -3600), - ("file_9", -10), - ("file_10", -3600*24*365), - ("file_11", 0), - )} - - return FolderState(self, work_dir, originals, work_files, original_files) - - def test_simple_prefix(self): - state = self.prepare() - drun(["rename", "execute"], **state) - state.check(prefixed=(11,)) - - def test_date(self): - state = self.prepare() - drun(["rename", "execute"], ["neglect_warning"], ["ignore_date"], **state) - state.check(prefixed=(4, 5, 6, 7, 8, 9, 10, 11)) - state = self.prepare() - drun(["rename", "execute"], match=["ignore_date"], **state) - state.check(prefixed=(4, 5, 6, 7, 11)) - - state = self.prepare() - drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 1}, **state) - state.check(prefixed=(4, 7, 8, 9, 11)) - state = self.prepare() - drun(["rename", "execute"], match={"tolerate_hour": 1}, **state) - state.check(prefixed=(4, 7, 11)) - - state = self.prepare() - drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 2}, **state) - state.check(prefixed=(4, 5, 6, 7, 8, 9, 11)) - state = self.prepare() - drun(["rename", "execute"], match={"tolerate_hour": 2}, **state) - state.check(prefixed=(4, 5, 6, 7, 11)) - - def test_replace_with_original(self): - state = self.prepare() - drun(["replace_with_original", "execute"], ["neglect_warning"], **state) - state.work_files["file_11"].suck(state.originals["file_11"]) - state.check() - - state = self.prepare() - drun(["replace_with_original", "execute"], ["neglect_warning"], {"tolerate_hour": 2}, **state) - state.check(suck=(4, 5, 6, 7, 8, 9, 11)) - - def test_invert_selection(self): - state = self.prepare() - with self.assertRaises(AssertionError): - drun(["replace_with_original", "execute"], match={"tolerate_hour": 2, "invert_selection": True}, **state) - drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 2, "invert_selection": False}, **state) - state.check(prefixed=(4, 5, 6, 7, 8, 9, 11)) - - state = self.prepare() - drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 2, "invert_selection": True}, **state) - state.check(prefixed=(1, 2, 10)) - - # No media file in the test case. - # def test_skip_bigger(self): - # state = self.prepare() - # Deduplidog(*state, rename=True, execute=True, ignore_date=True, skip_bigger=True, `media_magic=True`) - # state.check() - - -if __name__ == '__main__': - main() diff --git a/tests/setup.py b/tests/setup.py index a487653..c542846 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -1,15 +1,154 @@ +import os +import random +import string +from collections.abc import Mapping +from dataclasses import dataclass, field +from itertools import chain +from pathlib import Path +from shutil import copytree, copy2 +from tempfile import mkdtemp +from typing import Self +from unittest import TestCase + from deduplidog import Deduplidog -from deduplidog.deduplidog import Action, Execution, Match, Media, Helper +from deduplidog.deduplidog import Change, Action, Execution, Helper, Match, Media + + +def drun(action=None, execution=None, match=None, media=None, helper=None, confirm_one_by_one=False, **kw): + def _(d: list | dict): + if isinstance(d, list): + return {k: True for k in d} + return d + # as confirm_one_by_one affects the testing, this option is lifted up here + exec = {"confirm_one_by_one": confirm_one_by_one} if confirm_one_by_one is not None else {} -def drun(action=None, execution=None, match=None, media=None, helper=None, **kw): - def _(l: list | dict): - if isinstance(l, list): - return {k: True for k in l} - return l return Deduplidog(Action(**_(action or [])), - Execution(**_(execution or [])), + Execution(**_(execution or {}) | exec), Match(**_(match or [])), Media(**_(media or [])), Helper(**_(helper or [])), **kw).start() + + +@dataclass +class FileReal: + path: Path + + def __post_init__(self): + self._mtime = self.path.stat().st_mtime + + def check(self, test: TestCase): + "Checks the disk whether it contains the file represented." + test.assertTrue(self.path.exists(), msg=f"This file should exist: {self.path}") + test.assertEqual(self._mtime, self.path.stat().st_mtime, msg=self.path) + + def prefixed(self): + self.path = self.path.with_name("✓" + self.path.name) + + def suck(self, other: Self): + "Use the other file. Use its name, however stays in the current directory." + self.path = self.path.with_name(other.path.name) + self._mtime = other._mtime + + +@dataclass +class FileRepresentation(FileReal): + # path: Path + + mtime: int = 0 + "relative mtime" + text_seed: int = 1 + + def __post_init__(self): + self._mtime = round(self.path.parent.parent.stat().st_mtime + self.mtime) + + def write(self): + "Writes the representation to the disk." + self.path.write_text(self.get_text()) + os.utime(self.path, (self._mtime,)*2) + return self + + def check(self, test: TestCase): + super().check(test) + if self.path.suffix not in (".jpeg",): + test.assertEqual(self.get_text(), self.path.read_text(), msg=self.path) + + def get_text(self): + random.seed(self.text_seed) + return ''.join(random.choices(string.ascii_letters + string.digits, k=10+self.text_seed*10)) + + def suck(self, other: Self): + super().suck(other) + self.text_seed = other.text_seed + + +@dataclass +class FolderState(Mapping): + test_case: TestCase + _work_dir: Path + _original_dir: Path + work_files: dict[str, FileReal] = field(default_factory=lambda: {}) + originals: dict[str, FileReal] = field(default_factory=lambda: {}) + + def __post_init__(self): + def _(dir_: Path, files_: dict): + for file in dir_.rglob('*'): + if file.is_file(): + files_[str(file)] = FileReal(path=file) + + if not self.work_files: + _(self._work_dir, self.work_files) + if not self.originals: + _(self._original_dir, self.originals) + + def __iter__(self): + yield from ('work_dir', 'original_dir') + + def __len__(self): + return 2 + + def __getitem__(self, key): + if key == 'work_dir': + return self._work_dir + elif key == 'original_dir': + return self._original_dir + else: + raise KeyError(key) + + def check(self, prefixed: tuple[str] = None, suck: tuple[str] = None, prefixed_i: tuple[int] = None, suck_i: tuple[int] = None): + """Checks the file changes + + :param prefixed: These files in the work dir are expected to be prefixed + :param suck: These files in the work dir are expected to be sucked from the originals + :param prefixed_i: These file_{i} in the work dir are expected to be prefixed + :param suck_i: These file_{i} in the work dir are expected to be sucked from the originals + """ + [self.work_files[str(self._work_dir / f)].prefixed() for f in prefixed or ()] + [self.work_files[str(self._work_dir / f)].suck(self.originals[str(self._original_dir / f)]) for f in suck or ()] + + [self.work_files[f"file_{i}"].prefixed() for i in prefixed_i or ()] + [self.work_files[f"file_{i}"].suck(self.originals[f"file_{i}"]) for i in suck_i or ()] + + [f.check(self.test_case) for f in chain(self.work_files.values(), self.originals.values())] + + +class TestDisk(TestCase): + + def setUp(self): + self.disk2 = mkdtemp(dir="/tmp") + temp = str(self.disk2) + self.disk = Path(self.disk2) / "disk" + copytree("tests/test_data/disk", self.disk, copy_function=copy2) + + # assure a file 29 seconds ahead (because timestamp seems reset when uploading on a testing worker) + os.utime(self.disk / "folder1/dog1.jpg", (os.path.getmtime(self.disk / "folder2/dog1.jpg") - 29, ) * 2) + + # make a symlink + os.symlink(self.disk / "folder1/symlinkable.txt", self.disk / "folder2/symlinkable.txt") + + def log(self, log: list[Change], deduplidog: Deduplidog): + """ Check the deduplidog log output """ + # update the paths + for row, change in zip(log, deduplidog.changes): + self.assertDictEqual({self.disk / path: changes for path, changes in row.items()}, change) diff --git a/tests/test_data/disk/folder1/1.txt b/tests/test_data/disk/folder1/1.txt new file mode 100644 index 0000000..5626abf --- /dev/null +++ b/tests/test_data/disk/folder1/1.txt @@ -0,0 +1 @@ +one diff --git a/tests/test_data/disk/folder1/2.txt b/tests/test_data/disk/folder1/2.txt new file mode 100644 index 0000000..f719efd --- /dev/null +++ b/tests/test_data/disk/folder1/2.txt @@ -0,0 +1 @@ +two diff --git a/tests/test_data/disk/folder1/dog1.jpg b/tests/test_data/disk/folder1/dog1.jpg new file mode 100644 index 0000000..0cdea87 Binary files /dev/null and b/tests/test_data/disk/folder1/dog1.jpg differ diff --git a/tests/test_data/disk/folder1/dog2.mp4 b/tests/test_data/disk/folder1/dog2.mp4 new file mode 100644 index 0000000..00c5778 Binary files /dev/null and b/tests/test_data/disk/folder1/dog2.mp4 differ diff --git a/tests/test_data/disk/folder1/dog2_smaller.jpg b/tests/test_data/disk/folder1/dog2_smaller.jpg new file mode 100644 index 0000000..58f0903 Binary files /dev/null and b/tests/test_data/disk/folder1/dog2_smaller.jpg differ diff --git a/tests/test_data/disk/folder1/symlinkable.txt b/tests/test_data/disk/folder1/symlinkable.txt new file mode 100644 index 0000000..e44b758 --- /dev/null +++ b/tests/test_data/disk/folder1/symlinkable.txt @@ -0,0 +1 @@ +symlinked content diff --git a/tests/test_data/disk/folder1/unique.txt b/tests/test_data/disk/folder1/unique.txt new file mode 100644 index 0000000..96ab826 --- /dev/null +++ b/tests/test_data/disk/folder1/unique.txt @@ -0,0 +1 @@ +this file is unique diff --git a/tests/test_data/disk/folder2/1-different-name.txt b/tests/test_data/disk/folder2/1-different-name.txt new file mode 100644 index 0000000..5626abf --- /dev/null +++ b/tests/test_data/disk/folder2/1-different-name.txt @@ -0,0 +1 @@ +one diff --git a/tests/test_data/disk/folder2/2.txt b/tests/test_data/disk/folder2/2.txt new file mode 100644 index 0000000..f719efd --- /dev/null +++ b/tests/test_data/disk/folder2/2.txt @@ -0,0 +1 @@ +two diff --git a/tests/test_data/disk/folder2/dog1.jpg b/tests/test_data/disk/folder2/dog1.jpg new file mode 100644 index 0000000..83ae9db Binary files /dev/null and b/tests/test_data/disk/folder2/dog1.jpg differ diff --git a/tests/test_data/disk/folder2/dog1_other_name.jpg b/tests/test_data/disk/folder2/dog1_other_name.jpg new file mode 100644 index 0000000..83ae9db Binary files /dev/null and b/tests/test_data/disk/folder2/dog1_other_name.jpg differ diff --git a/tests/test_data/disk/folder2/folder2.1/1.txt b/tests/test_data/disk/folder2/folder2.1/1.txt new file mode 100644 index 0000000..5626abf --- /dev/null +++ b/tests/test_data/disk/folder2/folder2.1/1.txt @@ -0,0 +1 @@ +one diff --git a/tests/test_data/disk/folder2/folder2.1/dog2.jpg b/tests/test_data/disk/folder2/folder2.1/dog2.jpg new file mode 100644 index 0000000..b514cf7 Binary files /dev/null and b/tests/test_data/disk/folder2/folder2.1/dog2.jpg differ diff --git a/tests/test_data/disk/folder2/folder2.1/dog2.mp4 b/tests/test_data/disk/folder2/folder2.1/dog2.mp4 new file mode 100644 index 0000000..276338c Binary files /dev/null and b/tests/test_data/disk/folder2/folder2.1/dog2.mp4 differ diff --git a/tests/test_deduplidog.py b/tests/test_deduplidog.py index 8912be5..692d9e6 100644 --- a/tests/test_deduplidog.py +++ b/tests/test_deduplidog.py @@ -1,85 +1,10 @@ -from collections.abc import Mapping -from dataclasses import dataclass -from itertools import chain -import os from pathlib import Path from tempfile import TemporaryDirectory, mkdtemp from typing import Self from unittest import TestCase, main -import random -import string -from tests.setup import drun - - -@dataclass -class FileRepresentation: - path: Path - mtime: int = 0 - "relative mtime" - text_seed: int = 1 - - def __post_init__(self): - self._mtime = round(self.path.parent.parent.stat().st_mtime + self.mtime) - - def write(self): - "Writes the representation to the disk." - self.path.write_text(self.get_text()) - os.utime(self.path, (self._mtime,)*2) - return self - - def check(self, test: TestCase): - "Checks the disk whether it contains the file represented." - test.assertTrue(self.path.exists(), msg=self.path) - test.assertEqual(self.get_text(), self.path.read_text(), msg=self.path) - test.assertEqual(self._mtime, self.path.stat().st_mtime, msg=self.path) - - def get_text(self): - random.seed(self.text_seed) - return ''.join(random.choices(string.ascii_letters + string.digits, k=10+self.text_seed*10)) - - def prefixed(self): - self.path = self.path.with_name("✓" + self.path.name) - - def suck(self, other: Self): - "Use the other file. Use its name, however stays in the current directory." - self.path = self.path.with_name(other.path.name) - self._mtime = other._mtime - self.text_seed = other.text_seed - - -@dataclass -class FolderState(Mapping): - test_case: TestCase - _work_dir: Path - _original_dir: Path - work_files: dict[str, FileRepresentation] - originals: dict[str, FileRepresentation] - - def __iter__(self): - yield from ('work_dir', 'original_dir') - - def __len__(self): - return 2 - - def __getitem__(self, key): - if key == 'work_dir': - return self._work_dir - elif key == 'original_dir': - return self._original_dir - else: - raise KeyError(key) - - def check(self, prefixed: tuple[int] = None, suck: tuple[int] = None): - """Checks the file changes - - :param prefixed: These files in the work dir are expected to be prefixed - :param suck: These files in the work dir are expected to be sucked from the originals - """ - [self.work_files[f"file_{i}"].prefixed() for i in prefixed or ()] - [self.work_files[f"file_{i}"].suck(self.originals[f"file_{i}"]) for i in suck or ()] - [f.check(self.test_case) for f in chain(self.work_files.values(), self.originals.values())] +from tests.setup import FileRepresentation, FolderState, drun class TestDeduplidog(TestCase): @@ -114,29 +39,29 @@ def prepare(self, testing_dir: str = None): def test_simple_prefix(self): state = self.prepare() drun(["rename", "execute"], **state) - state.check(prefixed=(11,)) + state.check(prefixed_i=(11,)) def test_date(self): state = self.prepare() drun(["rename", "execute"], ["neglect_warning"], ["ignore_date"], **state) - state.check(prefixed=(4, 5, 6, 7, 8, 9, 10, 11)) + state.check(prefixed_i=(4, 5, 6, 7, 8, 9, 10, 11)) state = self.prepare() drun(["rename", "execute"], match=["ignore_date"], **state) - state.check(prefixed=(4, 5, 6, 7, 11)) + state.check(prefixed_i=(4, 5, 6, 7, 11)) state = self.prepare() drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 1}, **state) - state.check(prefixed=(4, 7, 8, 9, 11)) + state.check(prefixed_i=(4, 7, 8, 9, 11)) state = self.prepare() drun(["rename", "execute"], match={"tolerate_hour": 1}, **state) - state.check(prefixed=(4, 7, 11)) + state.check(prefixed_i=(4, 7, 11)) state = self.prepare() drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 2}, **state) - state.check(prefixed=(4, 5, 6, 7, 8, 9, 11)) + state.check(prefixed_i=(4, 5, 6, 7, 8, 9, 11)) state = self.prepare() drun(["rename", "execute"], match={"tolerate_hour": 2}, **state) - state.check(prefixed=(4, 5, 6, 7, 11)) + state.check(prefixed_i=(4, 5, 6, 7, 11)) def test_replace_with_original(self): state = self.prepare() @@ -146,24 +71,18 @@ def test_replace_with_original(self): state = self.prepare() drun(["replace_with_original", "execute"], ["neglect_warning"], {"tolerate_hour": 2}, **state) - state.check(suck=(4, 5, 6, 7, 8, 9, 11)) + state.check(suck_i=(4, 5, 6, 7, 8, 9, 11)) def test_invert_selection(self): state = self.prepare() with self.assertRaises(AssertionError): drun(["replace_with_original", "execute"], match={"tolerate_hour": 2, "invert_selection": True}, **state) drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 2, "invert_selection": False}, **state) - state.check(prefixed=(4, 5, 6, 7, 8, 9, 11)) + state.check(prefixed_i=(4, 5, 6, 7, 8, 9, 11)) state = self.prepare() drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 2, "invert_selection": True}, **state) - state.check(prefixed=(1, 2, 10)) - - # No media file in the test case. - # def test_skip_bigger(self): - # state = self.prepare() - # Deduplidog(*state, rename=True, execute=True, ignore_date=True, skip_bigger=True, `media_magic=True`) - # state.check() + state.check(prefixed_i=(1, 2, 10)) if __name__ == '__main__': diff --git a/tests/test_disk.py b/tests/test_disk.py new file mode 100644 index 0000000..5de4681 --- /dev/null +++ b/tests/test_disk.py @@ -0,0 +1,27 @@ + +from pathlib import Path +from tests.setup import FolderState, TestDisk, drun + + +class TestSymlinked(TestDisk): + def setUp(self): + super().setUp() + + def test_basic(self): + state = FolderState(self, self.disk / "folder1", self.disk / "folder2") + # We have to ignore the date as the remote checkout resets the mtime + d = drun(["rename", "execute"], [], ["ignore_date"], [], **state) + state.check(prefixed=("2.txt", "1.txt")) + self.log([ + {"folder1/2.txt": ["renaming"], "folder2/2.txt": []}, + {"folder1/1.txt": ["renaming"], "folder2/folder2.1/1.txt": []}, + ], d) + + def test_reverse(self): + state = FolderState(self, self.disk / "folder2", self.disk / "folder1") + d = drun(["rename", "execute"], [], ["ignore_date"], [], **state) + state.check(prefixed=("folder2.1/1.txt", "2.txt")) + self.log([ + {"folder2/2.txt": ["renaming"], "folder1/2.txt": []}, + {"folder2/folder2.1/1.txt": ["renaming"], "folder1/1.txt": []}, + ], d) diff --git a/tests/test_media_magic.py b/tests/test_media_magic.py new file mode 100644 index 0000000..21a04c2 --- /dev/null +++ b/tests/test_media_magic.py @@ -0,0 +1,56 @@ +from unittest import main + +from tests.setup import FolderState, drun, TestDisk + + +class TestMediaMagicTwoFolders(TestDisk): + def setUp(self): + super().setUp() + self.state = FolderState(self, self.disk / "folder2", self.disk / "folder1") + + def test_basic(self): + d = drun(["rename", "execute"], ["neglect_warning"], [], ["media_magic"], **self.state) + self.state.check(prefixed=("dog1.jpg", "folder2.1/dog2.mp4")) + + self.log([{'folder1/dog1.jpg': [], 'folder2/dog1.jpg': ['renaming']}, + {"folder2/folder2.1/dog2.mp4": ['SIZE WARNING 77.5 kB', 'renaming'], 'folder1/dog2.mp4': [], }], d) + + def test_ignore_name(self): + d = drun(["rename", "execute"], ["neglect_warning"], ["ignore_name"], ["media_magic"], **self.state) + self.state.check(prefixed=("dog1.jpg", "dog1_other_name.jpg", "folder2.1/dog2.mp4")) + + def test_ignore_name_small_hash(self): + # dog2.jpg in the work dir is bigger than the one in the originals dir + # the warning should be triggered + # Note that it needs bigger hash difference to be detected. + d = drun(["rename", "execute"], [], ["ignore_name"], + {"media_magic": True, "accepted_img_hash_diff": 3}, **self.state) + self.log([{"folder2.1/dog2.jpg": ['SIZE WARNING 195.5 kB', 'renaming']}], d) + self.state.check(prefixed=("dog1.jpg", "dog1_other_name.jpg")) + + def test_ignore_name_small_hash(self): + drun(["rename", "execute"], ["neglect_warning"], ["ignore_name"], + {"media_magic": True, "accepted_img_hash_diff": 3}, **self.state) + self.state.check(prefixed=("dog1.jpg", "dog1_other_name.jpg", "folder2.1/dog2.jpg", "folder2.1/dog2.mp4")) + + +class TestMediaMagicSwapped(TestDisk): + def setUp(self): + super().setUp() + self.state = FolderState(self, self.disk / "folder1", self.disk / "folder2") + + def test_basic(self): + d = drun(["rename", "execute"], ["neglect_warning"], [], ["media_magic"], **self.state) + self.state.check(prefixed=("dog1.jpg", "dog2.mp4")) + self.log([{"folder1/dog2.mp4": ["renaming"], "folder2/folder2.1/dog2.mp4": []}, + {"folder1/dog1.jpg": ['SIZE WARNING 75.2 kB', 'renaming'], "folder2/dog1.jpg": ['DATE WARNING + 29 seconds']}, ], d) + + # NOTE add + # def test_skip_bigger(self): + # state = self.prepare() + # Deduplidog(*state, rename=True, execute=True, ignore_date=True, skip_bigger=True, `media_magic=True`) + # state.check() + + +if __name__ == '__main__': + main()