diff --git a/pyproject.toml b/pyproject.toml
index 0035f724a..49e2d6ea0 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -76,7 +76,12 @@ fixable = ["ALL"]
[tool.ruff.lint.per-file-ignores]
"test*.py" = ["ARG002", "PLR2004"]
-"conftest.py" = ["ARG002"]
+"conftest.py" = ["ARG001", "ARG002"]
+"common_helper.py" = ["ARG002"]
+# ignore prints in CLI scripts
+"migrate_db_to_postgresql.py" = ["T201"]
+"manage_users.py" = ["T201"]
+"migrate_database.py" = ["T201"]
[tool.ruff.lint.isort]
known-first-party = ["analysis", "compare", "helperFunctions", "install", "intercom", "objects", "plugins", "scheduler",
diff --git a/src/config.py b/src/config.py
index 3d53323fc..7d6048df3 100644
--- a/src/config.py
+++ b/src/config.py
@@ -186,8 +186,10 @@ def load(path: str | Path | None = None):
Frontend.model_rebuild()
if path is None:
path = Path(__file__).parent / 'config/fact-core-config.toml'
+ elif isinstance(path, str):
+ path = Path(path)
- with open(path, encoding='utf8') as f: # noqa: PTH123
+ with path.open(encoding='utf8') as f:
cfg = toml.load(f)
_replace_hyphens_with_underscores(cfg)
diff --git a/src/conftest.py b/src/conftest.py
index 819c82148..6c17c1310 100644
--- a/src/conftest.py
+++ b/src/conftest.py
@@ -29,7 +29,7 @@ def docker_mount_base_dir() -> str:
@pytest.fixture
-def _firmware_file_storage_directory() -> str: # noqa: PT005
+def firmware_file_storage_directory() -> str:
with TemporaryDirectory(prefix='fact-firmware-file-storage-directory') as tmp_dir:
yield tmp_dir
@@ -98,11 +98,11 @@ def common_config(request, docker_mount_base_dir) -> config.Common:
@pytest.fixture
-def backend_config(request, common_config, _firmware_file_storage_directory) -> config.Backend:
+def backend_config(request, common_config, firmware_file_storage_directory) -> config.Backend:
overwrite_config = merge_markers(request, 'backend_config_overwrite', dict)
test_config = {
- 'firmware_file_storage_directory': _firmware_file_storage_directory,
+ 'firmware_file_storage_directory': firmware_file_storage_directory,
'block_delay': 0.1,
'ssdeep_ignore': 1,
'intercom_poll_delay': 1.0,
@@ -156,7 +156,7 @@ def frontend_config(request, common_config) -> config.Frontend:
@pytest.fixture(autouse=True)
-def patch_config(monkeypatch, common_config, backend_config, frontend_config): # noqa: PT004
+def _patch_config(monkeypatch, common_config, backend_config, frontend_config):
"""This fixture will replace :py:data`config.common`, :py:data:`config.backend` and :py:data:`config.frontend`
with the default test config.
@@ -194,7 +194,7 @@ class AnalysisPluginTestConfig(BaseModel):
@pytest.fixture
-def analysis_plugin(request, patch_config): # noqa: ARG001
+def analysis_plugin(request, _patch_config):
"""Returns an instance of an AnalysisPlugin.
This fixture can be configured by the supplying an instance of ``AnalysisPluginTestConfig`` as marker of the same
name.
diff --git a/src/helperFunctions/yara_binary_search.py b/src/helperFunctions/yara_binary_search.py
index bad06d8fd..89a95c654 100644
--- a/src/helperFunctions/yara_binary_search.py
+++ b/src/helperFunctions/yara_binary_search.py
@@ -28,7 +28,7 @@ def __init__(self):
self.db = DbInterfaceCommon()
self.fs_organizer = FSOrganizer()
- def _execute_yara_search(self, rule_file_path: str, target_path: str | None = None) -> str:
+ def _execute_yara_search(self, rule_file_path: str | Path, target_path: str | Path | None = None) -> str:
"""
Scans the (whole) db directory with the provided rule file and returns the (raw) results.
Yara-python cannot be used, because it (currently) supports single-file scanning only.
diff --git a/src/install/backend.py b/src/install/backend.py
index cb21a4779..d22660eb4 100644
--- a/src/install/backend.py
+++ b/src/install/backend.py
@@ -148,7 +148,7 @@ def _install_yara():
raise InstallationError(f'Error on yara extraction.\n{unzip_process.stdout}')
yara_folder = [p for p in Path().iterdir() if p.name.startswith('yara-')][0]
with OperateInDirectory(yara_folder.name, remove=True):
- os.chmod('bootstrap.sh', 0o775) # noqa: PTH101
+ Path('bootstrap.sh').chmod(0o775)
for command in ['./bootstrap.sh', './configure --enable-magic', 'make -j$(nproc)', 'sudo make install']:
cmd_process = subprocess.run(command, shell=True, stdout=PIPE, stderr=STDOUT, text=True, check=False)
if cmd_process.returncode != 0:
diff --git a/src/manage_users.py b/src/manage_users.py
index 2ac996855..47e58f444 100755
--- a/src/manage_users.py
+++ b/src/manage_users.py
@@ -3,7 +3,6 @@
import argparse
import getpass
import sys
-from pathlib import Path
from flask_security import hash_password
from prompt_toolkit import PromptSession
@@ -58,7 +57,7 @@ def __init__(self, session, app, store, db):
@staticmethod
def help():
- print( # noqa: T201
+ print(
'\nOne of the following actions can be chosen:\n'
'\n\t[add_role_to_user]\tadd existing role to an existing user'
'\n\t[create_role]\t\tcreate new role'
@@ -94,7 +93,7 @@ def create_user(self):
while True:
password = getpass.getpass('password: ')
if not password_is_legal(password):
- print('Password is not legal. Please choose another password.') # noqa: T201
+ print('Password is not legal. Please choose another password.')
continue
break
with self.app.app_context():
@@ -172,14 +171,14 @@ def get_apikey_for_user(self):
user = self.store.find_user(email=user)
apikey = user.api_key
- print(f'key: {apikey}') # noqa: T201
+ print(f'key: {apikey}')
def list_all_users(self):
user_list = self.store.list_users()
for user in user_list:
user_roles = ', '.join([role.name for role in user.roles])
- print(f'\n\t{user.email} ({user_roles})') # noqa: T201
- print() # noqa: T201
+ print(f'\n\t{user.email} ({user_roles})')
+ print()
@staticmethod
def exit():
@@ -198,8 +197,8 @@ def initialise_roles(app, interface, db):
def prompt_loop(app, store, db, session):
- print(FACT_ASCII_ART) # noqa: T201
- print('\nWelcome to the FACT User Management (FACTUM)\n') # noqa: T201
+ print(FACT_ASCII_ART)
+ print('\nWelcome to the FACT User Management (FACTUM)\n')
initialise_roles(app, store, db)
actions = Actions(session, app, store, db)
@@ -218,13 +217,13 @@ def prompt_loop(app, store, db, session):
acting_function()
except KeyboardInterrupt:
- print('returning to action selection') # noqa: T201
+ print('returning to action selection')
except AssertionError as assertion_error:
- print(f'error: {assertion_error}') # noqa: T201
+ print(f'error: {assertion_error}')
except EOFError:
break
- print('\nQuitting ..') # noqa: T201
+ print('\nQuitting ..')
def start_user_management(app, store, db, session):
@@ -235,7 +234,7 @@ def start_user_management(app, store, db, session):
def main():
args = setup_argparse()
- config.load(Path(args.config_file))
+ config.load(args.config_file)
app = create_app()
user_db, user_datastore = add_flask_security_to_app(app)
diff --git a/src/migrate_database.py b/src/migrate_database.py
index 076491a6c..6ed87b885 100755
--- a/src/migrate_database.py
+++ b/src/migrate_database.py
@@ -35,7 +35,7 @@ def upgrade(cur):
cur.execute('DROP TABLE "user"')
cur.execute('ALTER TABLE "user_tmp" RENAME TO "user"')
- print('Successfully upgraded the database') # noqa: T201
+ print('Successfully upgraded the database')
def downgrade(cur):
@@ -60,7 +60,7 @@ def downgrade(cur):
cur.execute('DROP TABLE "user"')
cur.execute('ALTER TABLE "user_tmp" RENAME TO "user"')
- print('Successfully downgraded the database') # noqa: T201
+ print('Successfully downgraded the database')
def main():
diff --git a/src/migrate_db_to_postgresql.py b/src/migrate_db_to_postgresql.py
index 011ecac01..d950e39f4 100644
--- a/src/migrate_db_to_postgresql.py
+++ b/src/migrate_db_to_postgresql.py
@@ -22,7 +22,7 @@
try:
from rich.progress import BarColumn, Progress, TimeElapsedColumn
except ImportError:
- print('Error: rich not found. Please install it:\npython3 -m pip install rich') # noqa: T201
+ print('Error: rich not found. Please install it:\npython3 -m pip install rich')
sys.exit(1)
PERCENTAGE = '[progress.percentage]{task.percentage:>3.0f}%'
@@ -299,9 +299,9 @@ def main():
migrator = DbMigrator(postgres=postgres, mongo=db, progress=progress)
migrated_fw_count = migrator.migrate_fw(query={}, root=True, label='firmwares')
if not migrated_fw_count:
- print('No firmware to migrate') # noqa: T201
+ print('No firmware to migrate')
else:
- print(f'Successfully migrated {migrated_fw_count} firmware DB entries') # noqa: T201
+ print(f'Successfully migrated {migrated_fw_count} firmware DB entries')
migrate_comparisons(db)
except errors.ServerSelectionTimeoutError:
logging.error(
@@ -387,9 +387,9 @@ def migrate_comparisons(mongo: MigrationMongoInterface):
compare_db.insert_comparison(comparison_id, results)
count += 1
if not count:
- print('No firmware comparison entries to migrate') # noqa: T201
+ print('No firmware comparison entries to migrate')
else:
- print(f'Migrated {count} comparison DB entries') # noqa: T201
+ print(f'Migrated {count} comparison DB entries')
if __name__ == '__main__':
diff --git a/src/plugins/analysis/binwalk/test/test_plugin_binwalk.py b/src/plugins/analysis/binwalk/test/test_plugin_binwalk.py
index ca3a1b07f..89af0d208 100644
--- a/src/plugins/analysis/binwalk/test/test_plugin_binwalk.py
+++ b/src/plugins/analysis/binwalk/test/test_plugin_binwalk.py
@@ -1,12 +1,10 @@
-from pathlib import Path
-
import pytest
from test.common_helper import get_test_data_dir
from ..code.binwalk import AnalysisPlugin
-TEST_FILE = Path(get_test_data_dir()) / 'container' / 'test.zip'
+TEST_FILE = get_test_data_dir() / 'container' / 'test.zip'
@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin)
diff --git a/src/plugins/analysis/elf_analysis/test/test_plugin_elf_analysis.py b/src/plugins/analysis/elf_analysis/test/test_plugin_elf_analysis.py
index 30c0c7734..f74b20b63 100644
--- a/src/plugins/analysis/elf_analysis/test/test_plugin_elf_analysis.py
+++ b/src/plugins/analysis/elf_analysis/test/test_plugin_elf_analysis.py
@@ -9,7 +9,7 @@
from ..code.elf_analysis import AnalysisPlugin
-TEST_DATA = Path(get_test_data_dir(), 'test_data_file.bin')
+TEST_DATA = get_test_data_dir() / 'test_data_file.bin'
TEST_DATA_DIR = Path(__file__).parent / 'data'
diff --git a/src/plugins/analysis/hardware_analysis/test/test_hardware_analysis.py b/src/plugins/analysis/hardware_analysis/test/test_hardware_analysis.py
index e77aa6dda..720e2ca35 100644
--- a/src/plugins/analysis/hardware_analysis/test/test_hardware_analysis.py
+++ b/src/plugins/analysis/hardware_analysis/test/test_hardware_analysis.py
@@ -1,14 +1,9 @@
-from pathlib import Path
-
import pytest
from objects.file import FileObject
-from test.common_helper import get_test_data_dir
from ..code.hardware_analysis import AnalysisPlugin
-TEST_DATA = Path(get_test_data_dir())
-
@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin)
class TestHardwareAnalysis:
diff --git a/src/plugins/analysis/qemu_exec/test/test_plugin_qemu_exec.py b/src/plugins/analysis/qemu_exec/test/test_plugin_qemu_exec.py
index 5c2512a48..2f3db1dfe 100644
--- a/src/plugins/analysis/qemu_exec/test/test_plugin_qemu_exec.py
+++ b/src/plugins/analysis/qemu_exec/test/test_plugin_qemu_exec.py
@@ -474,5 +474,5 @@ class MockFSOrganizer:
@staticmethod
def generate_path(fo):
if fo.uid != 'foo':
- return os.path.join(get_test_data_dir(), 'container/test.zip') # noqa: PTH118
+ return str(get_test_data_dir() / 'container/test.zip')
return None
diff --git a/src/test/acceptance/conftest.py b/src/test/acceptance/conftest.py
index f4c97fa35..c2304ef0b 100644
--- a/src/test/acceptance/conftest.py
+++ b/src/test/acceptance/conftest.py
@@ -9,7 +9,7 @@
@pytest.fixture(autouse=True)
-def _autouse_database_interfaces(database_interfaces): # noqa: ARG001
+def _autouse_database_interfaces(database_interfaces):
pass
@@ -27,12 +27,12 @@ def test_client(web_frontend):
@pytest.fixture
-def intercom_backend_binding(_unpacking_lock_manager, analysis_scheduler, comparison_scheduler, unpacking_scheduler):
+def intercom_backend_binding(unpacking_lock_manager, analysis_scheduler, comparison_scheduler, unpacking_scheduler):
_intercom_backend_binding = InterComBackEndBinding(
analysis_service=analysis_scheduler,
compare_service=comparison_scheduler,
unpacking_service=unpacking_scheduler,
- unpacking_locks=_unpacking_lock_manager,
+ unpacking_locks=unpacking_lock_manager,
)
_intercom_backend_binding.start()
@@ -76,7 +76,7 @@ def __init__(self, uid, path, name):
def upload_test_firmware(test_client, test_fw):
- testfile_path = Path(get_test_data_dir()) / test_fw.path
+ testfile_path = get_test_data_dir() / test_fw.path
with testfile_path.open('rb') as fp:
data = {
'file': (fp, test_fw.file_name),
diff --git a/src/test/acceptance/rest/test_rest_compare.py b/src/test/acceptance/rest/test_rest_compare.py
index b78434d62..413f9d09f 100644
--- a/src/test/acceptance/rest/test_rest_compare.py
+++ b/src/test/acceptance/rest/test_rest_compare.py
@@ -1,6 +1,5 @@
import urllib.parse
from base64 import standard_b64encode
-from pathlib import Path
import pytest
@@ -10,7 +9,7 @@
class TestRestCompareFirmware:
def _rest_upload_firmware(self, test_client, fw):
- testfile_path = Path(get_test_data_dir()) / fw.path
+ testfile_path = get_test_data_dir() / fw.path
file_content = testfile_path.read_bytes()
data = {
'binary': standard_b64encode(file_content).decode(),
diff --git a/src/test/acceptance/test_misc.py b/src/test/acceptance/test_misc.py
index 4d254c261..f8208d33c 100644
--- a/src/test/acceptance/test_misc.py
+++ b/src/test/acceptance/test_misc.py
@@ -1,5 +1,4 @@
import json
-import os
import time
from urllib.parse import quote
@@ -32,8 +31,8 @@ def _upload_firmware_get(self, test_client):
assert b'
Upload Firmware
' in rv.data, 'upload page not displayed correctly'
def _upload_firmware_put(self, test_client, path, device_name, uid):
- testfile_path = os.path.join(get_test_data_dir(), path) # noqa: PTH118
- with open(testfile_path, 'rb') as fp: # noqa: PTH123
+ testfile_path = get_test_data_dir() / path
+ with testfile_path.open('rb') as fp:
data = {
'file': fp,
'device_name': device_name,
diff --git a/src/test/common_helper.py b/src/test/common_helper.py
index 22f2be87d..e990a1dce 100644
--- a/src/test/common_helper.py
+++ b/src/test/common_helper.py
@@ -1,6 +1,5 @@
from __future__ import annotations
-import os
from base64 import standard_b64encode
from contextlib import contextmanager
from copy import deepcopy
@@ -17,11 +16,11 @@
from werkzeug.test import TestResponse
-def get_test_data_dir():
+def get_test_data_dir() -> Path:
"""
Returns the absolute path of the test data directory
"""
- return os.path.join(get_src_dir(), 'test/data') # noqa: PTH118
+ return Path(get_src_dir()) / 'test/data'
def create_test_firmware(
@@ -32,7 +31,7 @@ def create_test_firmware(
all_files_included_set=False,
version='0.1',
):
- fw = Firmware(file_path=os.path.join(get_test_data_dir(), bin_path)) # noqa: PTH118
+ fw = Firmware(file_path=str(get_test_data_dir() / bin_path))
fw.device_class = device_class
fw.device_name = device_name
fw.vendor = vendor
@@ -75,7 +74,7 @@ def create_test_firmware(
def create_test_file_object(bin_path='get_files_test/testfile1', uid=None, analyses=None):
- fo = FileObject(file_path=os.path.join(get_test_data_dir(), bin_path)) # noqa: PTH118
+ fo = FileObject(file_path=str(get_test_data_dir() / bin_path))
processed_analysis = {
'dummy': {
'summary': ['sum a', 'file exclusive sum b'],
@@ -145,7 +144,7 @@ class CommonDatabaseMock:
fo_uid = TEST_TEXT_FILE.uid
fw2_uid = TEST_FW_2.uid
- def __init__(self, config=None): # noqa: ARG002
+ def __init__(self, config=None):
self.tasks = []
self.locks = []
@@ -156,7 +155,7 @@ def get_read_only_session(self):
def update_view(self, file_name, content):
pass
- def get_object(self, uid, analysis_filter=None): # noqa: ARG002
+ def get_object(self, uid, analysis_filter=None):
if uid == TEST_FW.uid:
result = deepcopy(TEST_FW)
result.processed_analysis = {
@@ -180,7 +179,7 @@ def get_object(self, uid, analysis_filter=None): # noqa: ARG002
return result
return None
- def get_hid(self, uid, root_uid=None): # noqa: ARG002
+ def get_hid(self, uid, root_uid=None):
return 'TEST_FW_HID'
def get_device_class_list(self):
@@ -201,13 +200,13 @@ def get_number_of_total_matches(self, *_, **__):
def exists(self, uid):
return uid in (self.fw_uid, self.fo_uid, self.fw2_uid, 'error')
- def uid_list_exists(self, uid_list): # noqa: ARG002
+ def uid_list_exists(self, uid_list):
return set()
- def all_uids_found_in_database(self, uid_list): # noqa: ARG002
+ def all_uids_found_in_database(self, uid_list):
return True
- def get_data_for_nice_list(self, input_data, root_uid): # noqa: ARG002
+ def get_data_for_nice_list(self, input_data, root_uid):
return [NICE_LIST_DATA]
@staticmethod
@@ -218,7 +217,7 @@ def page_comparison_results():
def create_analysis_structure():
return ''
- def get_other_versions_of_firmware(self, fo): # noqa: ARG002
+ def get_other_versions_of_firmware(self, fo):
return []
def is_firmware(self, uid):
@@ -238,9 +237,7 @@ def get_summary(self, fo, selected_analysis):
@staticmethod
def comparison_exists(comparison_id):
- if comparison_id == COMPARISON_ID:
- return True
- return False
+ return comparison_id == COMPARISON_ID
@staticmethod
def get_comparison_result(comparison_id):
@@ -254,9 +251,7 @@ def get_comparison_result(comparison_id):
@staticmethod
def objects_exist(compare_id):
- if compare_id in ['existing_id', 'uid1;uid2', COMPARISON_ID]:
- return True
- return False
+ return compare_id in ['existing_id', 'uid1;uid2', COMPARISON_ID]
@staticmethod
def get_hid_dict(uid_set, root_uid): # noqa: ARG004
@@ -274,8 +269,8 @@ def fake_exit(self, *args): # noqa: ARG001
def get_firmware_for_rest_upload_test():
- testfile_path = os.path.join(get_test_data_dir(), 'container/test.zip') # noqa: PTH118
- with open(testfile_path, 'rb') as fp: # noqa: PTH123
+ testfile_path = get_test_data_dir() / 'container/test.zip'
+ with testfile_path.open('rb') as fp:
file_content = fp.read()
return {
'binary': standard_b64encode(file_content).decode(),
diff --git a/src/test/conftest.py b/src/test/conftest.py
index b3ccfaf03..596cf82b5 100644
--- a/src/test/conftest.py
+++ b/src/test/conftest.py
@@ -285,14 +285,14 @@ def analysis_finished_counter() -> Value:
@pytest.fixture
-def _unpacking_lock_manager() -> UnpackingLockManager: # noqa: PT005
+def unpacking_lock_manager() -> UnpackingLockManager:
_manager = UnpackingLockManager()
yield _manager
_manager.shutdown()
@pytest.fixture(name='test_config')
-def _scheduler_test_config(request) -> SchedulerTestConfig: # noqa: PT005
+def scheduler_test_config(request) -> SchedulerTestConfig:
return SchedulerTestConfig.get_instance_from_request(request)
@@ -305,13 +305,13 @@ def _store_file_if_not_exists(fs_organizer, file_object):
@pytest.fixture
-def analysis_scheduler(
- request, # noqa: ARG001
+def analysis_scheduler( # noqa: PLR0913
+ request,
pre_analysis_queue,
post_analysis_queue,
analysis_finished_event,
analysis_finished_counter,
- _unpacking_lock_manager,
+ unpacking_lock_manager,
test_config,
monkeypatch,
) -> AnalysisScheduler:
@@ -324,7 +324,7 @@ def analysis_scheduler(
monkeypatch.setattr('scheduler.analysis.plugin.FSOrganizer', test_config.fs_organizer_class)
_analysis_scheduler = AnalysisScheduler(
post_analysis=lambda *_: None,
- unpacking_locks=_unpacking_lock_manager,
+ unpacking_locks=unpacking_lock_manager,
)
fs_organizer = test_config.fs_organizer_class()
@@ -392,7 +392,7 @@ def unpacking_finished_counter() -> Value:
def unpacking_scheduler(
request,
post_unpack_queue,
- _unpacking_lock_manager,
+ unpacking_lock_manager,
test_config,
unpacking_finished_event,
unpacking_finished_counter,
@@ -416,7 +416,7 @@ def _post_unpack_hook(fw):
_unpacking_scheduler = UnpackingScheduler(
post_unpack=_post_unpack_hook,
fs_organizer=fs_organizer,
- unpacking_locks=_unpacking_lock_manager,
+ unpacking_locks=unpacking_lock_manager,
db_interface=test_config.backend_db_class,
)
add_task = _unpacking_scheduler.add_task
@@ -454,7 +454,7 @@ def comparison_finished_event(request) -> Event:
@pytest.fixture
-def comparison_scheduler(request, comparison_finished_event, test_config) -> ComparisonScheduler: # noqa: ARG001
+def comparison_scheduler(request, comparison_finished_event, test_config) -> ComparisonScheduler:
"""Returns an instance of :py:class:`~scheduler.comparison_scheduler.ComparisonScheduler`.
The scheduler has some extra testing features. See :py:class:`SchedulerTestConfig` for the features.
"""
diff --git a/src/test/integration/conftest.py b/src/test/integration/conftest.py
index 1b509bffc..65e3ac2b8 100644
--- a/src/test/integration/conftest.py
+++ b/src/test/integration/conftest.py
@@ -3,5 +3,5 @@
# Integration tests test the system as a whole so one can reasonably expect the database to be populated.
@pytest.fixture(autouse=True)
-def _autouse_database_interfaces(database_interfaces): # noqa: ARG001
+def _autouse_database_interfaces(database_interfaces):
pass
diff --git a/src/test/integration/scheduler/test_regression_virtual_file_path.py b/src/test/integration/scheduler/test_regression_virtual_file_path.py
index 21a3c7c79..36d3be21c 100644
--- a/src/test/integration/scheduler/test_regression_virtual_file_path.py
+++ b/src/test/integration/scheduler/test_regression_virtual_file_path.py
@@ -1,6 +1,5 @@
from __future__ import annotations
-from pathlib import Path
from queue import Empty
from typing import TYPE_CHECKING
@@ -23,7 +22,7 @@
def add_test_file(scheduler, path_in_test_dir):
- firmware = Firmware(file_path=str(Path(get_test_data_dir(), path_in_test_dir)))
+ firmware = Firmware(file_path=str(get_test_data_dir() / path_in_test_dir))
firmware.release_date = '1990-01-16'
firmware.version, firmware.vendor, firmware.device_name, firmware.device_class = ['foo'] * 4
scheduler.add_task(firmware)
diff --git a/src/test/unit/analysis/test_yara_plugin_base.py b/src/test/unit/analysis/test_yara_plugin_base.py
index 1671f574a..a34af385d 100644
--- a/src/test/unit/analysis/test_yara_plugin_base.py
+++ b/src/test/unit/analysis/test_yara_plugin_base.py
@@ -29,7 +29,7 @@ def test_get_signature_paths(self, analysis_plugin):
assert f'{intended_signature_path.rstrip("/")}.yc' == analysis_plugin.signature_path, 'signature path is wrong'
def test_process_object(self, analysis_plugin):
- test_file = FileObject(file_path=os.path.join(get_test_data_dir(), 'yara_test_file')) # noqa: PTH118
+ test_file = FileObject(file_path=str(get_test_data_dir() / 'yara_test_file'))
test_file.processed_analysis.update({analysis_plugin.NAME: []})
processed_file = analysis_plugin.process_object(test_file)
results = processed_file.processed_analysis[analysis_plugin.NAME]
@@ -38,7 +38,7 @@ def test_process_object(self, analysis_plugin):
assert results['summary'] == ['testRule']
def test_process_object_nothing_found(self, analysis_plugin):
- test_file = FileObject(file_path=os.path.join(get_test_data_dir(), 'zero_byte')) # noqa: PTH118
+ test_file = FileObject(file_path=str(get_test_data_dir() / 'zero_byte'))
test_file.processed_analysis.update({analysis_plugin.NAME: []})
processed_file = analysis_plugin.process_object(test_file)
assert len(processed_file.processed_analysis[analysis_plugin.NAME]) == 1, 'result present but should not'
diff --git a/src/test/unit/helperFunctions/test_file_system.py b/src/test/unit/helperFunctions/test_file_system.py
index d6fa8cefb..c83b08ce9 100644
--- a/src/test/unit/helperFunctions/test_file_system.py
+++ b/src/test/unit/helperFunctions/test_file_system.py
@@ -12,7 +12,7 @@
)
from test.common_helper import get_test_data_dir
-TEST_DATA_DIR = Path(get_test_data_dir())
+TEST_DATA_DIR = get_test_data_dir()
@pytest.fixture
@@ -62,4 +62,4 @@ def test_file_is_zero_broken_link():
def test_get_config_dir():
- assert os.path.exists(f'{get_config_dir()}/fact-core-config.toml'), 'main config file not found' # noqa: PTH110
+ assert Path(f'{get_config_dir()}/fact-core-config.toml').exists(), 'main config file not found'
diff --git a/src/test/unit/helperFunctions/test_hash.py b/src/test/unit/helperFunctions/test_hash.py
index f02b991ee..c8caaf608 100644
--- a/src/test/unit/helperFunctions/test_hash.py
+++ b/src/test/unit/helperFunctions/test_hash.py
@@ -1,5 +1,4 @@
import os
-from pathlib import Path
from helperFunctions.hash import (
_suppress_stdout,
@@ -31,7 +30,7 @@ def test_get_ssdeep():
def test_imphash():
- fo = create_test_file_object(bin_path=str(Path(get_test_data_dir(), 'test_executable')))
+ fo = create_test_file_object(bin_path=str(get_test_data_dir() / 'test_executable'))
fo.processed_analysis = {'file_type': {'result': {'mime': 'application/x-executable'}}}
imphash = get_imphash(fo)
assert isinstance(imphash, str), 'imphash should be a string'
diff --git a/src/test/unit/helperFunctions/test_program_setup.py b/src/test/unit/helperFunctions/test_program_setup.py
index 08f87f1da..f35b0cd23 100644
--- a/src/test/unit/helperFunctions/test_program_setup.py
+++ b/src/test/unit/helperFunctions/test_program_setup.py
@@ -6,7 +6,7 @@
class ArgumentMock:
- config_file = get_test_data_dir() + '/load_cfg_test'
+ config_file = f'{get_test_data_dir()}/load_cfg_test'
log_file = '/tmp/fact_test_argument_log_file.log'
log_level = 'DEBUG'
silent = False
diff --git a/src/test/unit/helperFunctions/test_yara_binary_search.py b/src/test/unit/helperFunctions/test_yara_binary_search.py
index d52c1b4dd..a4c033573 100644
--- a/src/test/unit/helperFunctions/test_yara_binary_search.py
+++ b/src/test/unit/helperFunctions/test_yara_binary_search.py
@@ -1,5 +1,5 @@
import unittest
-from os import path
+from pathlib import Path
from subprocess import CalledProcessError
from unittest import mock
from unittest.mock import patch
@@ -28,7 +28,7 @@ def mock_check_output(call, *_, shell=True, stderr=None, **__): # noqa: ARG001
@pytest.mark.backend_config_overwrite(
- {'firmware_file_storage_directory': path.join(get_test_data_dir(), TEST_FILE_1)}, # noqa: PTH118
+ {'firmware_file_storage_directory': str(get_test_data_dir() / TEST_FILE_1)},
)
class TestHelperFunctionsYaraBinarySearch(unittest.TestCase):
@mock.patch('helperFunctions.yara_binary_search.DbInterfaceCommon', MockCommonDbInterface)
@@ -96,20 +96,20 @@ def test_parse_raw_result(self):
}
def test_execute_yara_search(self):
- test_rule_path = path.join(get_test_data_dir(), 'yara_binary_search_test_rule') # noqa: PTH118
+ test_rule_path = get_test_data_dir() / 'yara_binary_search_test_rule'
result = self.yara_binary_scanner._execute_yara_search(test_rule_path)
assert 'test_rule' in result
def test_execute_yara_search_for_single_file(self):
- test_rule_path = path.join(get_test_data_dir(), 'yara_binary_search_test_rule') # noqa: PTH118
+ test_rule_path = get_test_data_dir() / 'yara_binary_search_test_rule'
result = self.yara_binary_scanner._execute_yara_search(
test_rule_path,
- target_path=path.join(get_test_data_dir(), TEST_FILE_1, TEST_FILE_1), # noqa: PTH118
+ target_path=get_test_data_dir() / TEST_FILE_1 / TEST_FILE_1,
)
assert 'test_rule' in result
def test_get_file_paths_of_files_included_in_fo(self):
result = self.yara_binary_scanner._get_file_paths_of_files_included_in_fw('single_firmware')
assert len(result) == 2
- assert path.basename(result[0]) == TEST_FILE_2 # noqa: PTH119
- assert path.basename(result[1]) == TEST_FILE_3 # noqa: PTH119
+ assert Path(result[0]).name == TEST_FILE_2
+ assert Path(result[1]).name == TEST_FILE_3
diff --git a/src/test/unit/scheduler/test_analysis.py b/src/test/unit/scheduler/test_analysis.py
index 06e192aa7..e7854d392 100644
--- a/src/test/unit/scheduler/test_analysis.py
+++ b/src/test/unit/scheduler/test_analysis.py
@@ -1,4 +1,3 @@
-import os
from multiprocessing import Queue
from time import sleep
from unittest import mock
@@ -49,7 +48,7 @@ def test_schedule_firmware_init_no_analysis_selected(self, analysis_scheduler):
@pytest.mark.SchedulerTestConfig(start_processes=True)
def test_whole_run_analysis_selected(self, analysis_scheduler, post_analysis_queue):
- test_fw = Firmware(file_path=os.path.join(get_test_data_dir(), 'get_files_test/testfile1')) # noqa: PTH118
+ test_fw = Firmware(file_path=get_test_data_dir() / 'get_files_test/testfile1')
test_fw.scheduled_analysis = ['dummy_plugin_for_testing_only']
analysis_scheduler.start_analysis_of_object(test_fw)
analysis_results = [post_analysis_queue.get(timeout=10) for _ in range(3)]
@@ -113,7 +112,7 @@ def test_get_plugin_dict_version(self, analysis_scheduler):
), 'version not correct'
def test_process_next_analysis_unknown_plugin(self, analysis_scheduler):
- test_fw = Firmware(file_path=os.path.join(get_test_data_dir(), 'get_files_test/testfile1')) # noqa: PTH118
+ test_fw = Firmware(file_path=get_test_data_dir() / 'get_files_test/testfile1')
test_fw.scheduled_analysis = ['unknown_plugin']
with mock_spy(analysis_scheduler, '_start_or_skip_analysis') as spy:
@@ -131,7 +130,7 @@ def test_process_next_analysis_unknown_plugin(self, analysis_scheduler):
}
)
def test_skip_analysis_because_whitelist(self, analysis_scheduler, post_analysis_queue):
- test_fw = Firmware(file_path=os.path.join(get_test_data_dir(), 'get_files_test/testfile1')) # noqa: PTH118
+ test_fw = Firmware(file_path=get_test_data_dir() / 'get_files_test/testfile1')
test_fw.scheduled_analysis = ['file_hashes']
test_fw.processed_analysis['file_type'] = {'result': {'mime': 'text/plain'}}
analysis_scheduler._start_or_skip_analysis('dummy_plugin_for_testing_only', test_fw)
diff --git a/src/test/unit/unpacker/test_tar_repack.py b/src/test/unit/unpacker/test_tar_repack.py
index c988a4367..63fc33c61 100644
--- a/src/test/unit/unpacker/test_tar_repack.py
+++ b/src/test/unit/unpacker/test_tar_repack.py
@@ -1,5 +1,3 @@
-import os
-
import magic
from test.common_helper import get_test_data_dir
@@ -9,7 +7,7 @@
def test_tar_repack():
repack_service = TarRepack()
- file_path = os.path.join(get_test_data_dir(), 'container/test.zip') # noqa: PTH118
+ file_path = str(get_test_data_dir() / 'container/test.zip')
result = repack_service.tar_repack(file_path)
file_type = magic.from_buffer(result, mime=False)
assert 'gzip compressed data' in file_type, 'Result is not an tar.gz file'
diff --git a/src/test/unit/unpacker/test_unpacker.py b/src/test/unit/unpacker/test_unpacker.py
index 5c78420a5..47dcbe427 100644
--- a/src/test/unit/unpacker/test_unpacker.py
+++ b/src/test/unit/unpacker/test_unpacker.py
@@ -1,4 +1,3 @@
-from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
@@ -8,7 +7,7 @@
from test.common_helper import create_test_file_object, get_test_data_dir
from unpacker.unpack import Unpacker
-TEST_DATA_DIR = Path(get_test_data_dir())
+TEST_DATA_DIR = get_test_data_dir()
EXTRACTION_DIR = TEST_DATA_DIR / 'files'
diff --git a/src/unpacker/unpack_base.py b/src/unpacker/unpack_base.py
index 7c12b5ea7..1dbc9c65e 100644
--- a/src/unpacker/unpack_base.py
+++ b/src/unpacker/unpack_base.py
@@ -2,7 +2,8 @@
import logging
import shutil
-from os import getgid, getuid, makedirs
+from http import HTTPStatus
+from os import getgid, getuid
from pathlib import Path
from subprocess import CalledProcessError
@@ -47,7 +48,7 @@ def extract_files_from_file(
@staticmethod
def _initialize_shared_folder(tmp_dir):
for subpath in ['files', 'reports', 'input']:
- makedirs(str(Path(tmp_dir, subpath)), exist_ok=True) # noqa: PTH103
+ Path(tmp_dir, subpath).mkdir(exist_ok=True)
@staticmethod
def _extract_with_worker(file_path: str, container: ExtractionContainer, tmp_dir: str):
@@ -57,7 +58,7 @@ def _extract_with_worker(file_path: str, container: ExtractionContainer, tmp_dir
raise ExtractionError('Timeout during extraction.') from error
except requests.exceptions.ConnectionError as error:
raise ExtractionError('Extraction container could not be reached.') from error
- if response.status_code != 200: # noqa: PLR2004
+ if response.status_code != HTTPStatus.OK:
logging.error(response.text, response.status_code)
raise ExtractionError(f'Extraction of {file_path} failed')
diff --git a/src/web_interface/components/analysis_routes.py b/src/web_interface/components/analysis_routes.py
index bd253f1a3..0752a9660 100644
--- a/src/web_interface/components/analysis_routes.py
+++ b/src/web_interface/components/analysis_routes.py
@@ -2,8 +2,8 @@
import json
import logging
-import os
from contextlib import suppress
+from pathlib import Path
from typing import TYPE_CHECKING
from common_helper_files import get_binary_from_file
@@ -33,9 +33,7 @@
def get_analysis_view(view_name):
- view_path = os.path.join( # noqa: PTH118
- get_src_dir(), f'web_interface/templates/analysis_plugins/{view_name}.html'
- )
+ view_path = Path(get_src_dir()) / f'web_interface/templates/analysis_plugins/{view_name}.html'
return get_binary_from_file(view_path).decode('utf-8')
diff --git a/src/web_interface/components/io_routes.py b/src/web_interface/components/io_routes.py
index 7666994a5..830e91b6f 100644
--- a/src/web_interface/components/io_routes.py
+++ b/src/web_interface/components/io_routes.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+import http
import json
from pathlib import Path
from tempfile import TemporaryDirectory
@@ -109,7 +110,7 @@ def show_radare(self, uid):
try:
host = config.frontend.radare2_url
response = requests.post(f'{host}/v1/retrieve', data=binary, verify=False)
- if response.status_code != 200: # noqa: PLR2004
+ if response.status_code != http.HTTPStatus.OK:
raise TimeoutError(response.text)
target_link = f"{host}{response.json()['endpoint']}m/"
sleep(1)
diff --git a/src/web_interface/components/jinja_filter.py b/src/web_interface/components/jinja_filter.py
index 622d4afb2..e7fbd2754 100644
--- a/src/web_interface/components/jinja_filter.py
+++ b/src/web_interface/components/jinja_filter.py
@@ -20,6 +20,8 @@
from storage.db_interface_frontend import MetaEntry
from web_interface.frontend_database import FrontendDatabase
+CHART_ELEMENT_COUNT_LIMIT = 100
+
class FilterClass:
"""
@@ -160,85 +162,85 @@ def data_to_chart_limited(self, data, limit: int | None = None, color_list=None)
def _get_chart_element_count(self):
limit = config.frontend.max_elements_per_chart
- if limit > 100: # noqa: PLR2004
+ if limit > CHART_ELEMENT_COUNT_LIMIT:
logging.warning('Value of "max_elements_per_chart" in configuration is too large.')
- return 100
+ return CHART_ELEMENT_COUNT_LIMIT
return limit
def data_to_chart(self, data):
color_list = get_color_list(1) * len(data)
return self.data_to_chart_limited(data, limit=0, color_list=color_list)
- def _setup_filters(self): # noqa: PLR0915
+ def _setup_filters(self):
self._app.jinja_env.add_extension('jinja2.ext.do')
-
- self._app.jinja_env.filters['all_items_equal'] = lambda data: len({str(value) for value in data.values()}) == 1
- self._app.jinja_env.filters['as_ascii_table'] = flt.as_ascii_table
- self._app.jinja_env.filters['auth_enabled'] = self.check_auth
- self._app.jinja_env.filters['base64_encode'] = flt.encode_base64_filter
- self._app.jinja_env.filters['bytes_to_str'] = flt.bytes_to_str_filter
- self._app.jinja_env.filters['data_to_chart'] = self.data_to_chart
- self._app.jinja_env.filters['data_to_chart_limited'] = self.data_to_chart_limited
- self._app.jinja_env.filters[
- 'data_to_chart_with_value_percentage_pairs'
- ] = flt.data_to_chart_with_value_percentage_pairs
- self._app.jinja_env.filters['decompress'] = flt.decompress
- self._app.jinja_env.filters['dict_to_json'] = json.dumps
- self._app.jinja_env.filters['fix_cwe'] = flt.fix_cwe
- self._app.jinja_env.filters['format_duration'] = flt.format_duration
- self._app.jinja_env.filters['format_string_list_with_offset'] = flt.filter_format_string_list_with_offset
- self._app.jinja_env.filters['get_canvas_height'] = flt.get_canvas_height
- self._app.jinja_env.filters['get_searchable_crypto_block'] = flt.get_searchable_crypto_block
- self._app.jinja_env.filters['get_unique_keys_from_list_of_dicts'] = flt.get_unique_keys_from_list_of_dicts
- self._app.jinja_env.filters['hex'] = hex
- self._app.jinja_env.filters['hide_dts_binary_data'] = flt.hide_dts_binary_data
- self._app.jinja_env.filters['infection_color'] = flt.infection_color
- self._app.jinja_env.filters['is_list'] = lambda item: isinstance(item, list)
- self._app.jinja_env.filters['json_dumps'] = json.dumps
- self._app.jinja_env.filters['link_cve'] = flt.replace_cve_with_link
- self._app.jinja_env.filters['link_cwe'] = flt.replace_cwe_with_link
- self._app.jinja_env.filters['list_group'] = flt.list_group
- self._app.jinja_env.filters['list_group_collapse'] = flt.list_group_collapse
- self._app.jinja_env.filters['list_to_line_break_string'] = flt.list_to_line_break_string
- self._app.jinja_env.filters['list_to_line_break_string_no_sort'] = flt.list_to_line_break_string_no_sort
- self._app.jinja_env.filters['md5_hash'] = get_md5
- self._app.jinja_env.filters['min'] = min
- self._app.jinja_env.filters['nice_generic'] = flt.generic_nice_representation
- self._app.jinja_env.filters['nice_number'] = flt.nice_number_filter
- self._app.jinja_env.filters['nice_time'] = time_format
- self._app.jinja_env.filters['nice_uid_list'] = self._filter_nice_uid_list
- self._app.jinja_env.filters['nice_unix_time'] = flt.nice_unix_time
- self._app.jinja_env.filters['nice_virtual_path_list'] = self._nice_virtual_path_list
- self._app.jinja_env.filters['number_format'] = flt.byte_number_filter
- self._app.jinja_env.filters['octal_to_readable'] = flt.octal_to_readable
- self._app.jinja_env.filters['print_program_version'] = self._filter_print_program_version
- self._app.jinja_env.filters['regex_meta'] = flt.comment_out_regex_meta_chars
- self._app.jinja_env.filters['remaining_time'] = elapsed_time
- self._app.jinja_env.filters['render_analysis_tags'] = flt.render_analysis_tags
- self._app.jinja_env.filters['render_general_information'] = self._render_general_information_table
- self._app.jinja_env.filters['render_query_title'] = flt.render_query_title
- self._app.jinja_env.filters['render_fw_tags'] = flt.render_fw_tags
- self._app.jinja_env.filters['replace_comparison_uid_with_hid'] = self._filter_replace_comparison_uid_with_hid
- self._app.jinja_env.filters['replace_uid_with_file_name'] = self._filter_replace_uid_with_file_name
- self._app.jinja_env.filters['replace_uid_with_hid_link'] = self._filter_replace_uid_with_hid_link
- self._app.jinja_env.filters['replace_uid_with_hid'] = self._filter_replace_uid_with_hid
- self._app.jinja_env.filters['replace_underscore'] = flt.replace_underscore_filter
- self._app.jinja_env.filters['version_is_compatible'] = flt.version_is_compatible
- self._app.jinja_env.filters['sort_chart_list_by_name'] = flt.sort_chart_list_by_name
- self._app.jinja_env.filters['sort_chart_list_by_value'] = flt.sort_chart_list_by_value
- self._app.jinja_env.filters['sort_comments'] = flt.sort_comments
- self._app.jinja_env.filters['sort_cve'] = flt.sort_cve_results
- self._app.jinja_env.filters['sort_privileges'] = lambda privileges: sorted(
- privileges, key=lambda role: len(privileges[role]), reverse=True
+ self._app.jinja_env.filters.update(
+ {
+ 'all_items_equal': lambda data: len({str(value) for value in data.values()}) == 1,
+ 'as_ascii_table': flt.as_ascii_table,
+ 'auth_enabled': self.check_auth,
+ 'base64_encode': flt.encode_base64_filter,
+ 'bytes_to_str': flt.bytes_to_str_filter,
+ 'data_to_chart': self.data_to_chart,
+ 'data_to_chart_limited': self.data_to_chart_limited,
+ 'data_to_chart_with_value_percentage_pairs': flt.data_to_chart_with_value_percentage_pairs,
+ 'decompress': flt.decompress,
+ 'dict_to_json': json.dumps,
+ 'fix_cwe': flt.fix_cwe,
+ 'format_duration': flt.format_duration,
+ 'format_string_list_with_offset': flt.filter_format_string_list_with_offset,
+ 'get_canvas_height': flt.get_canvas_height,
+ 'get_searchable_crypto_block': flt.get_searchable_crypto_block,
+ 'get_unique_keys_from_list_of_dicts': flt.get_unique_keys_from_list_of_dicts,
+ 'hex': hex,
+ 'hide_dts_binary_data': flt.hide_dts_binary_data,
+ 'infection_color': flt.infection_color,
+ 'is_list': lambda item: isinstance(item, list),
+ 'json_dumps': json.dumps,
+ 'link_cve': flt.replace_cve_with_link,
+ 'link_cwe': flt.replace_cwe_with_link,
+ 'list_group': flt.list_group,
+ 'list_group_collapse': flt.list_group_collapse,
+ 'list_to_line_break_string': flt.list_to_line_break_string,
+ 'list_to_line_break_string_no_sort': flt.list_to_line_break_string_no_sort,
+ 'md5_hash': get_md5,
+ 'min': min,
+ 'nice_generic': flt.generic_nice_representation,
+ 'nice_number': flt.nice_number_filter,
+ 'nice_time': time_format,
+ 'nice_uid_list': self._filter_nice_uid_list,
+ 'nice_unix_time': flt.nice_unix_time,
+ 'nice_virtual_path_list': self._nice_virtual_path_list,
+ 'number_format': flt.byte_number_filter,
+ 'octal_to_readable': flt.octal_to_readable,
+ 'print_program_version': self._filter_print_program_version,
+ 'regex_meta': flt.comment_out_regex_meta_chars,
+ 'remaining_time': elapsed_time,
+ 'render_analysis_tags': flt.render_analysis_tags,
+ 'render_general_information': self._render_general_information_table,
+ 'render_query_title': flt.render_query_title,
+ 'render_fw_tags': flt.render_fw_tags,
+ 'replace_comparison_uid_with_hid': self._filter_replace_comparison_uid_with_hid,
+ 'replace_uid_with_file_name': self._filter_replace_uid_with_file_name,
+ 'replace_uid_with_hid_link': self._filter_replace_uid_with_hid_link,
+ 'replace_uid_with_hid': self._filter_replace_uid_with_hid,
+ 'replace_underscore': flt.replace_underscore_filter,
+ 'version_is_compatible': flt.version_is_compatible,
+ 'sort_chart_list_by_name': flt.sort_chart_list_by_name,
+ 'sort_chart_list_by_value': flt.sort_chart_list_by_value,
+ 'sort_comments': flt.sort_comments,
+ 'sort_cve': flt.sort_cve_results,
+ 'sort_privileges': (
+ lambda privileges: sorted(privileges, key=lambda role: len(privileges[role]), reverse=True)
+ ),
+ 'sort_roles': flt.sort_roles_by_number_of_privileges,
+ 'sort_users': flt.sort_users_by_name,
+ 'split_user_and_password_type': self._split_user_and_password_type_entry,
+ 'str_to_hex': flt.str_to_hex,
+ 'text_highlighter': flt.text_highlighter,
+ 'uids_to_link': flt.uids_to_link,
+ 'user_has_role': flt.user_has_role,
+ 'version_links': flt.create_firmware_version_links,
+ 'vulnerability_class': flt.vulnerability_class,
+ '_linter_reformat_issues': flt.linter_reformat_issues,
+ }
)
- self._app.jinja_env.filters['sort_roles'] = flt.sort_roles_by_number_of_privileges
- self._app.jinja_env.filters['sort_users'] = flt.sort_users_by_name
- self._app.jinja_env.filters['split_user_and_password_type'] = self._split_user_and_password_type_entry
- self._app.jinja_env.filters['str_to_hex'] = flt.str_to_hex
- self._app.jinja_env.filters['text_highlighter'] = flt.text_highlighter
- self._app.jinja_env.filters['uids_to_link'] = flt.uids_to_link
- self._app.jinja_env.filters['user_has_role'] = flt.user_has_role
- self._app.jinja_env.filters['version_links'] = flt.create_firmware_version_links
- self._app.jinja_env.filters['vulnerability_class'] = flt.vulnerability_class
-
- self._app.jinja_env.filters['_linter_reformat_issues'] = flt.linter_reformat_issues
diff --git a/src/web_interface/rest/helper.py b/src/web_interface/rest/helper.py
index 60e465bb4..5087d6d64 100644
--- a/src/web_interface/rest/helper.py
+++ b/src/web_interface/rest/helper.py
@@ -1,15 +1,15 @@
-from __future__ import annotations
-
"""
This module offers neat wrapper functionality for use in rest endpoints.
Most wrappers target request and response parsing.
"""
-import calendar # noqa: E402
-import json # noqa: E402
-import time # noqa: E402
-from copy import deepcopy # noqa: E402
-from typing import TYPE_CHECKING # noqa: E402
+from __future__ import annotations
+
+import calendar
+import json
+import time
+from copy import deepcopy
+from typing import TYPE_CHECKING
if TYPE_CHECKING:
from werkzeug.datastructures import ImmutableMultiDict
@@ -81,13 +81,13 @@ def get_paging(request_parameters: ImmutableMultiDict) -> tuple[int, int]:
"""
try:
offset = int(request_parameters.get('offset', 0))
- except (TypeError, ValueError):
- raise ValueError('Malformed offset parameter') # noqa: B904
+ except (TypeError, ValueError) as error:
+ raise ValueError('Malformed offset parameter') from error
try:
limit = int(request_parameters.get('limit', 0))
- except (TypeError, ValueError):
- raise ValueError('Malformed limit parameter') # noqa: B904
+ except (TypeError, ValueError) as error:
+ raise ValueError('Malformed limit parameter') from error
return offset, limit
@@ -104,8 +104,8 @@ def get_query(request_parameters: ImmutableMultiDict) -> dict:
query = json.loads(query if query else '{}')
except (AttributeError, KeyError):
return {}
- except json.JSONDecodeError:
- raise ValueError('Query must be a json document') # noqa: B904
+ except json.JSONDecodeError as error:
+ raise ValueError('Query must be a json document') from error
if not isinstance(query, dict):
raise ValueError('Query must be a json document')
return query if query else {}
@@ -125,8 +125,8 @@ def get_boolean_from_request(request_parameters: ImmutableMultiDict, name: str)
raise TypeError()
except (AttributeError, KeyError):
return False
- except (json.JSONDecodeError, TypeError):
- raise ValueError(f'{name} must be true or false') # noqa: B904
+ except (json.JSONDecodeError, TypeError) as error:
+ raise ValueError(f'{name} must be true or false') from error
return parameter
@@ -140,10 +140,10 @@ def get_update(request_parameters: ImmutableMultiDict) -> list:
"""
try:
update = json.loads(request_parameters.get('update'))
- except (AttributeError, KeyError, TypeError):
- raise ValueError('Malformed or missing parameter: update') # noqa: B904
- except json.JSONDecodeError:
- raise ValueError('Update parameter has to be a list') # noqa: B904
+ except (AttributeError, KeyError, TypeError) as error:
+ raise ValueError('Malformed or missing parameter: update') from error
+ except json.JSONDecodeError as error:
+ raise ValueError('Update parameter has to be a list') from error
if not isinstance(update, list):
raise ValueError('Update must be a list')
if not update: