diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1cb7739..6a4abf5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,7 +8,7 @@ repos: - id: trailing-whitespace - id: end-of-file-fixer - id: check-json - exclude: ".vscode(.dist)?/.*" + exclude: "(.vscode(.dist)?/.*)|(tests/test_dict.json)" - id: check-yaml - id: check-builtin-literals - id: check-case-conflict diff --git a/src/consts.py b/src/consts.py index ac423cc..ea884c2 100644 --- a/src/consts.py +++ b/src/consts.py @@ -1,3 +1,21 @@ +import dataclasses +from pathlib import Path + +from ankiutils.consts import AddonConsts as BaseAddonConsts from ankiutils.consts import get_consts -consts = get_consts(__name__) + +@dataclasses.dataclass +class AddonConsts(BaseAddonConsts): + userfiles_dir: Path + dicts_dir: Path + icons_dir: Path + + +base_consts = get_consts(__name__) +consts = AddonConsts( + **dataclasses.asdict(base_consts), + userfiles_dir=base_consts.dir / "user_files", + dicts_dir=base_consts.dir / "user_files" / "dictionaries", + icons_dir=base_consts.dir / "icons" +) diff --git a/src/fetcher.py b/src/fetcher.py index 4095d86..00964b5 100644 --- a/src/fetcher.py +++ b/src/fetcher.py @@ -1,9 +1,10 @@ from __future__ import annotations -import functools import json +import shutil +import sqlite3 from pathlib import Path -from typing import Callable +from typing import Any, Callable class WiktionaryError(Exception): @@ -16,10 +17,34 @@ class WordNotFoundError(WiktionaryError): class WiktionaryFetcher: def __init__(self, dictionary: str, base_dir: Path): - self.dict_dir = base_dir / dictionary + self.db_path = base_dir / f"{dictionary}.db" + self._connection = sqlite3.connect(self.db_path, check_same_thread=False) + self._connection.executescript( + """ + CREATE TABLE IF NOT EXISTS words ( + word text, + data text + ); + CREATE INDEX IF NOT EXISTS index_word ON words(word); + """ + ) + + def close(self) -> None: + self._connection.close() + + def __enter__(self) -> WiktionaryFetcher: + return self + + def __exit__(self, exc_type: Any, exc_value: Any, exc_tb: Any) -> None: + self.close() + + def _add_word(self, word: str, data: str) -> None: + self._connection.execute( + "INSERT INTO words(word, data) values(?, ?)", (word, data) + ) @classmethod - def dump_kaikki_dict( + def import_kaikki_dict( cls, filename: str | Path, dictionary: str, @@ -28,44 +53,42 @@ def dump_kaikki_dict( base_dir: Path, ) -> int: """Dumps a JSON file downloaded from https://kaikki.org/dictionary/{lang}/ - to separate files for each entry in 'dictionary'""" - outdir = base_dir / dictionary - outdir.mkdir(exist_ok=True) + to a SQLite database""" + base_dir.mkdir(exist_ok=True) count = 0 with open(filename, encoding="utf-8") as file: - for i, line in enumerate(file): - entry = json.loads(line) - word = entry["word"] - try: - with open( - outdir / f"{word}.json", - mode="w", - encoding="utf-8", - ) as outfile: - outfile.write(line) + with WiktionaryFetcher(dictionary, base_dir) as fetcher: + for i, line in enumerate(file): + try: + entry = json.loads(line) + word = entry["word"] + fetcher._add_word(word, line) count += 1 - except Exception as exc: - on_error(word, exc) - if i % 50 == 0: - if not on_progress(i + 1): - break + except Exception as exc: + print(f"{exc=}") + on_error(word, exc) + if i % 50 == 0: + if not on_progress(i + 1): + break + fetcher._connection.commit() return count - @staticmethod - @functools.lru_cache - def _get_word_json(dict_dir: Path, word: str) -> dict: - # TODO: handle words with multiple word senses - - try: - with open(dict_dir / f"{word}.json", encoding="utf-8") as file: - return json.load(file) - except FileNotFoundError as exc: - raise WordNotFoundError( - f'"{word}" was not found in the dictionary.' - ) from exc + @classmethod + def migrate_dict_to_sqlite(cls, dictionary_dir: Path, new_dir: Path) -> None: + with WiktionaryFetcher(dictionary_dir.name, new_dir) as fetcher: + for file in dictionary_dir.iterdir(): + fetcher._add_word(file.stem, file.read_text(encoding="utf-8")) + fetcher._connection.commit() + shutil.rmtree(dictionary_dir) def get_word_json(self, word: str) -> dict: - return self._get_word_json(self.dict_dir, word) + # TODO: handle words with multiple word senses + row = self._connection.execute( + "SELECT data FROM words WHERE word = ?", (word,) + ).fetchone() + if not row: + raise WordNotFoundError(f'"{word}" was not found in the dictionary.') + return json.loads(row[0]) def get_senses(self, word: str) -> list[str]: data = self.get_word_json(word) diff --git a/src/gui/importer.py b/src/gui/importer.py index 964ec50..6bc6791 100644 --- a/src/gui/importer.py +++ b/src/gui/importer.py @@ -2,6 +2,8 @@ import os import re +import sys +import traceback from concurrent.futures import Future from typing import TYPE_CHECKING @@ -89,6 +91,7 @@ def on_done(future: Future) -> None: try: count = future.result() except Exception as exc: + traceback.print_exception(None, exc, exc.__traceback__, file=sys.stdout) showWarning(str(exc), parent=self, title=consts.name) return tooltip(f"Successfully imported {count} words", parent=self.mw) @@ -101,13 +104,13 @@ def on_done(future: Future) -> None: return self.mw.progress.start(label="Starting importing...", parent=self) self.mw.progress.set_title(f"{consts.name} - Importing a dictionary") - # TODO: handle exceptions self.mw.taskman.run_in_background( - lambda: WiktionaryFetcher.dump_kaikki_dict( + lambda: WiktionaryFetcher.import_kaikki_dict( filename, name, on_progress=on_progress, on_error=on_error, + base_dir=consts.dicts_dir, ), on_done=on_done, ) diff --git a/src/gui/main.py b/src/gui/main.py index c0a05b9..34e7346 100644 --- a/src/gui/main.py +++ b/src/gui/main.py @@ -2,7 +2,7 @@ import os import time -from typing import TYPE_CHECKING, Any, Callable, cast +from typing import TYPE_CHECKING, Any, Callable from urllib.parse import unquote import requests @@ -21,6 +21,7 @@ from ..consts import consts from ..fetcher import WiktionaryFetcher, WordNotFoundError +from ..utils import get_dict_names if TYPE_CHECKING or qtmajor > 5: from ..forms.main_qt6 import Ui_Dialog @@ -31,10 +32,6 @@ PROGRESS_LABEL = "Updated {count} out of {total} note(s)" -def get_available_dicts() -> list[str]: - return [p.name for p in (consts.dir / "user_files").iterdir() if p.is_dir()] - - class WiktionaryFetcherDialog(QDialog): def __init__( self, @@ -61,9 +58,9 @@ def __init__( ] self.setWindowTitle(consts.name) self.form.icon.setPixmap( - QPixmap(os.path.join(consts.dir, "icons", "enwiktionary-1.5x.png")) + QPixmap(os.path.join(consts.icons_dir, "enwiktionary-1.5x.png")) ) - self.form.dictionaryComboBox.addItems(get_available_dicts()) + self.form.dictionaryComboBox.addItems(get_dict_names()) self.downloader: WiktionaryFetcher | None = None qconnect(self.form.addButton.clicked, self.on_add) self.form.addButton.setShortcut(QKeySequence("Ctrl+Return")) @@ -153,8 +150,7 @@ def on_add(self) -> None: textFormat="rich", ) return - dictionary = self.form.dictionaryComboBox.currentText() - self.downloader = WiktionaryFetcher(dictionary, consts.dir / "user_files") + dictionary_name = self.form.dictionaryComboBox.currentText() word_field = self.form.wordFieldComboBox.currentText() definition_field_i = self.form.definitionFieldComboBox.currentIndex() example_field_i = self.form.exampleFieldComboBox.currentIndex() @@ -186,6 +182,7 @@ def on_failure(exc: Exception) -> None: op = QueryOp( parent=self, op=lambda col: self._fill_notes( + dictionary_name, word_field, field_tuples, ), @@ -203,6 +200,7 @@ def on_failure(exc: Exception) -> None: def _fill_notes( self, + dictionary_name: str, word_field: str, field_tuples: tuple[tuple[int, Callable[[str], str]], ...], ) -> None: @@ -222,33 +220,33 @@ def on_progress() -> None: max=len(self.notes), ) - for note in self.notes: - word = strip_html(note[word_field]).strip() - if not word: - continue - need_updating = False - try: - for field_tuple in field_tuples: - if not field_tuple[0]: - continue - contents = field_tuple[1](word) - note[self.field_names[field_tuple[0]]] = contents - need_updating = True - except WordNotFoundError as exc: - self.errors.append(str(exc)) - finally: - if need_updating: - self.updated_notes.append(note) - if time.time() - last_progress >= 0.01: - self.mw.taskman.run_on_main(on_progress) - last_progress = time.time() - if want_cancel: - break + with WiktionaryFetcher(dictionary_name, consts.dicts_dir) as fetcher: + for note in self.notes: + word = strip_html(note[word_field]).strip() + if not word: + continue + need_updating = False + try: + for field_tuple in field_tuples: + if not field_tuple[0]: + continue + contents = field_tuple[1](fetcher, word) + note[self.field_names[field_tuple[0]]] = contents + need_updating = True + except WordNotFoundError as exc: + self.errors.append(str(exc)) + finally: + if need_updating: + self.updated_notes.append(note) + if time.time() - last_progress >= 0.01: + self.mw.taskman.run_on_main(on_progress) + last_progress = time.time() + if want_cancel: + break self.mw.taskman.run_on_main(self.mw.progress.finish) - def _get_definitions(self, word: str) -> str: - downloader = cast(WiktionaryFetcher, self.downloader) - defs = downloader.get_senses(word) + def _get_definitions(self, fetcher: WiktionaryFetcher, word: str) -> str: + defs = fetcher.get_senses(word) if len(defs) == 0: return "" if len(defs) == 1: @@ -259,9 +257,8 @@ def _get_definitions(self, word: str) -> str: formatted += "" return formatted - def _get_examples(self, word: str) -> str: - downloader = cast(WiktionaryFetcher, self.downloader) - examples = downloader.get_examples(word) + def _get_examples(self, fetcher: WiktionaryFetcher, word: str) -> str: + examples = fetcher.get_examples(word) if len(examples) == 0: return "" if len(examples) == 1: @@ -272,21 +269,17 @@ def _get_examples(self, word: str) -> str: formatted += "" return formatted - def _get_gender(self, word: str) -> str: - downloader = cast(WiktionaryFetcher, self.downloader) - return downloader.get_gender(word) + def _get_gender(self, fetcher: WiktionaryFetcher, word: str) -> str: + return fetcher.get_gender(word) - def _get_part_of_speech(self, word: str) -> str: - downloader = cast(WiktionaryFetcher, self.downloader) - return downloader.get_part_of_speech(word) + def _get_part_of_speech(self, fetcher: WiktionaryFetcher, word: str) -> str: + return fetcher.get_part_of_speech(word) - def _get_ipa(self, word: str) -> str: - downloader = cast(WiktionaryFetcher, self.downloader) - return downloader.get_ipa(word) + def _get_ipa(self, fetcher: WiktionaryFetcher, word: str) -> str: + return fetcher.get_ipa(word) - def _get_audio(self, word: str) -> str: - downloader = cast(WiktionaryFetcher, self.downloader) - url = downloader.get_audio_url(word) + def _get_audio(self, fetcher: WiktionaryFetcher, word: str) -> str: + url = fetcher.get_audio_url(word) http_session = requests.Session() # https://meta.wikimedia.org/wiki/User-Agent_policy @@ -302,13 +295,11 @@ def _get_audio(self, word: str) -> str: filename = self.mw.col.media.write_data(unquote(os.path.basename(url)), data) return "[sound:" + filename + "]" - def _get_etymology(self, word: str) -> str: - downloader = cast(WiktionaryFetcher, self.downloader) - return downloader.get_etymology(word) + def _get_etymology(self, fetcher: WiktionaryFetcher, word: str) -> str: + return fetcher.get_etymology(word) - def _get_declension(self, word: str) -> str: - downloader = cast(WiktionaryFetcher, self.downloader) - declensions = downloader.get_declension(word) + def _get_declension(self, fetcher: WiktionaryFetcher, word: str) -> str: + declensions = fetcher.get_declension(word) if len(declensions) == 0: return "" formatted = "