From 40bec78e0cf213132708d4b2fc074ee8ab7dfc72 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Sun, 11 Feb 2018 20:29:44 +0300 Subject: [PATCH] move to v2.0.0 (#13) * extend tests for RUZ.person_lessons * [skip ci] move to v1.0.1 * change url for API v2 * [skip ci] move to v1.1.0 * fix email checkers use lower() method before check * move to v1.1.1 * update gitignore: remove redundant files * add "hell03end" to license * move schema from utils to root folder * move logging from utils to root, update logging * combine utils to single file, update none_safe decorator, other fixes * simplify RUZ logic, remove RUZ class, rename to main, remove CamelCase methods, other fixes * update __init__.py * refresh tests * update setup.py (prepare to v2.0.0) * add find_by_str, remove staff_of_streams * update tests * update travis.yml and readme * add dev requirements * remove python < 3.5 from supported versions --- .gitignore | 107 +----- .travis.yml | 6 +- LICENSE | 2 +- README.rst | 14 +- requirements.txt | 2 + ruz/RUZ.py | 785 -------------------------------------- ruz/__init__.py | 20 +- ruz/logging.py | 121 ++++++ ruz/main.py | 443 +++++++++++++++++++++ ruz/{utils => }/schema.py | 17 +- ruz/utils.py | 36 ++ ruz/utils/__init__.py | 12 - ruz/utils/decorators.py | 56 --- ruz/utils/logging.py | 81 ---- setup.py | 42 +- tests/__init__.py | 28 ++ tests/test_RUZ.py | 370 +++++------------- tests/test_schema.py | 219 +++++------ tests/test_utils.py | 40 -- 19 files changed, 891 insertions(+), 1510 deletions(-) delete mode 100644 ruz/RUZ.py create mode 100644 ruz/logging.py create mode 100644 ruz/main.py rename ruz/{utils => }/schema.py (95%) create mode 100644 ruz/utils.py delete mode 100644 ruz/utils/__init__.py delete mode 100644 ruz/utils/decorators.py delete mode 100644 ruz/utils/logging.py delete mode 100644 tests/test_utils.py diff --git a/.gitignore b/.gitignore index e6e62e8..273ef7c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,104 +1,15 @@ -tmp/ -*.dump - -# Byte-compiled / optimized / DLL files __pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -env/ +.vscode/ +.cache/ build/ -develop-eggs/ dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -*.egg-info/ -.installed.cfg -*.egg - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -.hypothesis/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# pyenv -.python-version - -# celery beat schedule file -celerybeat-schedule - -# SageMath parsed files -*.sage.py - -# dotenv -.env - -# virtualenv +wiki/ +logs/ .venv venv/ -ENV/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site +.env +env/ +tmp/ -# mypy -.mypy_cache/ +*.log +*.py[cod] diff --git a/.travis.yml b/.travis.yml index d700a48..8f835de 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,14 +3,12 @@ dist: trusty sudo: false language: python python: - - "3.3" + # - "3.3" - "3.4" - "3.5" - "3.6" - "nightly" - "pypy3" -cache: - pip: true # caches $HOME/.cache/pip branches: only: @@ -19,7 +17,7 @@ branches: env: matrix: allow_failures: - - python: "pypy3" + - python: "3.4" install: - pip install -r requirements.txt diff --git a/LICENSE b/LICENSE index 72ca951..76e3790 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2017 Dmitriy Pchelkin +Copyright (c) 2017 Dmitriy Pchelkin | hell03end Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.rst b/README.rst index 0a447a0..6db1b55 100644 --- a/README.rst +++ b/README.rst @@ -12,13 +12,11 @@ Python wrapper for HSE RUZ API. __ https://github.com/hell03end/hse_ruz/wiki/Changelog -Feel free to contribute. - Requirements ------------ -* Python Python 3.3+ or PyPy3 +* Python >= 3.5 or latest PyPy3 Installation @@ -26,8 +24,6 @@ Installation .. code-block:: bash - pip install hse_ruz - # or update pip install -U hse_ruz @@ -36,17 +32,13 @@ Usage .. code-block:: python - from ruz import RUZ - api = RUZ() - assert api.v == 1 - assert api.person_lessons("mymail@edu.hse.ru") + import ruz + schedule = ruz.person_lessons("mymail@edu.hse.ru") Contributing ------------ -Please, use type annotations. - .. code-block:: bash git clone https://github.com/hell03end/hse_ruz.git diff --git a/requirements.txt b/requirements.txt index add7ca1..82e5344 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ pytest>=3.2.1 +requests==2.11.1 +twine==1.9.1 diff --git a/ruz/RUZ.py b/ruz/RUZ.py deleted file mode 100644 index 8500083..0000000 --- a/ruz/RUZ.py +++ /dev/null @@ -1,785 +0,0 @@ -import json -import re -from collections import Iterable -from copy import deepcopy -from datetime import datetime as dt -from datetime import timedelta as td -from functools import lru_cache -from http.client import HTTPResponse -from urllib import error, parse, request - -from ruz.utils import (EMAIL_DOMAINS, EMAIL_PATTERN, REQUEST_SCHEMA, - RUZ_API2_ENDPOINTS, RUZ_API2_URL, RUZ_API_ENDPOINTS, - RUZ_API_URL, Logger, log, none_safe) - - -class RUZ(object): - """ - Handler for RUZ API - - Both CamelCase and snake_notation supported for method names and - params. CamelCase is depreciated. - - Usage - ----- - >>> api = RUZ() - """ - - def __init__(self, strict_v1: bool=False, **kwargs): - """ - :param strict_v1 - force usage of api v1. - - Advanced - -------- - :param base_url :type str - valid entry point to HSE RUZ API. - :param endpoints :type dict - endpoints for API. - :param schema :type dict - schema for request params of each - endpoint (to validate params). - :param domains :type Iterable - collection of valid HSE domains. - - Usage - ----- - >>> RUZ(base_url=None) - Traceback (most recent call last): - ... - PermissionError: Can't get base url! - >>> RUZ(strict_v1=False)._url2[-2] - '2' - >>> RUZ()._url2 == RUZ()._url - True - """ - self._url = kwargs.pop('base_url', RUZ_API_URL) - if not self._url or not isinstance(self._url, str): - raise PermissionError("Can't get base url!") - self._endpoints = kwargs.pop('endpoints', RUZ_API_ENDPOINTS) - if not self._endpoints or not isinstance(self._endpoints, dict): - raise ValueError("Can't find correct endpoints!") - self._schema = kwargs.pop('schema', REQUEST_SCHEMA) - if not isinstance(self._schema, (dict, list)): - raise ValueError( - "Expect list or dict, got: {}".format(type(self._schema)) - ) - self._domains = kwargs.pop('domains', EMAIL_DOMAINS) - if not isinstance(self._domains, tuple): - raise ValueError( - "Expect domains as tuple, got: {}".format(type(self._domains)) - ) - self._logger = Logger(str(self.__class__)) - self._url2 = self._url - self._endpoints2 = self._endpoints - self._v = 1 - if not strict_v1: - self._url2 = kwargs.pop('base_url', RUZ_API2_URL) - self._endpoints2 = kwargs.pop('endpoints', RUZ_API2_ENDPOINTS) - self._v = 2 - super(RUZ, self).__init__() - - @property - def ok(self) -> bool: - """ - Check internet connection is ok by connecting to google servers - - Usage - ----- - >>> RUZ().ok - True - """ - try: - r = request.urlopen(r"http://google.com") - del r - except error.URLError as err: - self._logger.info("Can't connect to google.com: %s", err) - return False - return True - - @property - def schema(self) -> dict: - """ - (copy of) Current request schema - - Usage - ----- - >>> RUZ().schema is not RUZ().schema - True - """ - return deepcopy(self._schema) - - @property - def v(self) -> int: - """ - Max API version - - Usage - ----- - >>> RUZ(strict_v1=True).v - 1 - >>> RUZ(strict_v1=False).v - 2 - """ - return self._v - - @property - def domains(self) -> tuple: - """ - Collection of valid HSE email domains - - Usage - ----- - >>> isinstance(RUZ().domains, tuple) - True - """ - return self._domains - - @property - def endpoints(self) -> dict: - """ - Collection HSE API endpoints - - Usage - ----- - >>> isinstance(RUZ().endpoints, dict) - True - >>> RUZ().endpoints is not RUZ().endpoints - True - """ - return deepcopy(self._endpoints) - - @property - def endpoints2(self) -> dict: - """ - Collection HSE API endpoints for API v2 - - Usage - ----- - >>> isinstance(RUZ().endpoints2, dict) - True - >>> RUZ().endpoints2 is not RUZ().endpoints2 - True - """ - return deepcopy(self._endpoints2) - - @staticmethod - def is_student(email: str) -> bool: - """ - Check user is student or not by HSE email address - - :param email, required - valid HSE email addres or domain. - - Usage - ----- - >>> RUZ.is_student("somemail@hse.ru") - False - >>> RUZ.is_student("somemail@edu.hse.ru") - True - """ - if not isinstance(email, str): - raise ValueError("Expect str, got: {}".format(type(email))) - domain = email - if r"@" in email: - domain = email.split(r'@')[-1] - if domain == "edu.hse.ru": - return True - elif domain == "hse.ru": - return False - raise ValueError("Wrong HSE domain: {}".format(domain)) - - @log - def _make_url(self, endpoint: str, data: dict=None, v: int=1) -> str: - """ - Creates full url for API requests - - :param endpoint - endpoint for request. - :param data - request params. - :param v - API version to use (v1 is always used as fallback). - - Usage - ----- - >>> RUZ()._make_url("schedule") - http://92.242.58.221/ruzservice.svc/personLessons - >>> RUZ()._make_url("schedule", data={'email': 123}) - http://92.242.58.221/ruzservice.svc/personLessons?email=123 - """ - url = self._url - endpoint = self._endpoints[endpoint] - if v != 1 and self.v != 1: - url = self._url2 - endpoint = self._endpoints2[endpoint] - if data: - return "{}{}?{}".format(url, endpoint, parse.urlencode(data)) - return "{}{}".format(url, endpoint) - - @log - def _request(self, endpoint: str, data: dict=None) -> HTTPResponse: - """ - Implements request to API with given params - - :param endpoint - endpoint for request. - :param data - request params. - - Usage - ----- - >>> RUZ()._request("schedule", data={'email': "user@hse.ru"}) - Traceback (most recent call last): - ... - urllib.error.HTTPError: HTTP Error 400: Bad Request - """ - if self._v == 2: - # api v2 may be unreachable for some API methods - try: - return request.urlopen(self._make_url(endpoint, data, v=2)) - except (error.HTTPError, error.URLError) as excinfo: - self._logger.warning("v2 API unreachable for '%s': %s", - endpoint, excinfo) - return request.urlopen(self._make_url(endpoint, data)) - - @log - def _verify_schema(self, endpoint: str, **params) -> None: - """ - Check params fit schema for certain endpoint - - :param endpoint - endpoint for request. - :param check_online :type bool - check email throw API call. - schema params passes with **params. - - Usage - ----- - >>> RUZ()._verify_schema("") - Traceback (most recent call last): - ... - ValueError: Wrong endpoint: '' - """ - if (endpoint == "schedule" and "lecturerOid" not in params and - "studentOid" not in params and "email" not in params and - "auditoriumOid" not in params): - raise ValueError("One of the followed required: lecturer_id, " - "auditorium_id, student_id, email for " - "schedule endpoint.") - email = params.get('email') - if email: - self.verify_email( - email, - params.get('receiverType', 3), - params.pop('check_online', False) - ) - endpoint = self._endpoints[endpoint] # it's ok to use only v1 here - schema = self._schema.get(endpoint) - if schema is None: - raise KeyError("Wrong endpoint: '{}'".format(endpoint)) - for key, value in params.items(): - if key not in schema: - raise KeyError("Wrong param '{}' for '{}' endpoint".format( - key, endpoint - )) - if not isinstance(value, schema[key]): - raise ValueError("Expected {} for '{}'::'{}' got: {}".format( - schema[key], endpoint, key, type(value) - )) - - @staticmethod - def check_email(email: str, pattern: str=EMAIL_PATTERN) -> None: - """ - Check email is valid HSE corp. email - - Throws an exception - - :param email, required - email address to check. - :param pattern - pattern to check against. - - Usage - ----- - >>> RUZ.check_email("somemail@hse.com") - Traceback (most recent call last): - ... - ValueError: Wrong email address: somemail@hse.com - >>> not RUZ.check_email("somemail@edu.hse.ru") - True - >>> RUZ.check_email("somem@il@edu.hse.ru") - Traceback (most recent call last): - ... - ValueError: Wrong email address: somem@il@edu.hse.ru - >>> RUZ.check_email("somemail@google.ru") - Traceback (most recent call last): - ... - ValueError: Wrong email domain: google.ru - """ - if not isinstance(email, str): - raise ValueError("Expect str, got: {}".format(type(email))) - elif not re.match(pattern, email): - raise ValueError("Wrong email address: {}".format(email)) - - domain = email.split('@')[-1] - if domain not in EMAIL_DOMAINS: - raise ValueError("Wrong email domain: {}".format(domain)) - del domain - - @staticmethod - def date(day_bias: int=0) -> str: - """ - Return date in RUZ API compatible format - - :param day_bias - number of day from now. - - Usage - ----- - >>> isinstance(RUZ.date(), str) - True - >>> RUZ.date('abc') - Traceback (most recent call last): - ... - ValueError: Expect int, got: - """ - if not isinstance(day_bias, int): - raise ValueError("Expect int, got: {}".format(type(day_bias))) - if day_bias < 0: - return (dt.now() - td(days=-day_bias)).strftime(r'%Y.%m.%d') - return (dt.now() + td(days=day_bias)).strftime(r'%Y.%m.%d') - - @lru_cache(maxsize=128) - @log - def verify_email(self, email: str, receiver_type: int=3, - check_online: bool=False) -> None: - """ - Check email is valid for given receiver type (to use in API) - - Throw an exception. - - :param email - email address to check (for schedules only). - :param receiver_type - type of requested schedule if any. - :param check_online - check email throw API call. - - Usage - ----- - >>> RUZ().verify_email("somemail@hse.com") - Traceback (most recent call last): - ... - ValueError: Wrong email address: somemail@hse.com - >>> not RUZ().verify_email("somemail@edu.hse.ru") - True - >>> RUZ().verify_email("somem@il@edu.hse.ru") - Traceback (most recent call last): - ... - ValueError: Wrong email address: somem@il@edu.hse.ru - >>> RUZ().verify_email("somemail@google.ru") - Traceback (most recent call last): - ... - ValueError: Wrong email domain: google.ru - >>> RUZ().verify_email("somemail@hse.ru", -1) - Traceback (most recent call last): - ... - ValueError: Wrong receiverType: -1 - >>> RUZ().verify_email("somemail@hse.ru", 2) - Traceback (most recent call last): - ... - ValueError: No email needed for receiverType: 2 - """ - RUZ.check_email(email) - - domain = email.split('@')[-1] - if receiver_type == 1: - if domain != "hse.ru": - self._logger.warning("Wrong domain for teacher: %s", domain) - elif receiver_type == 2: - del domain - raise ValueError("No email needed for receiverType: 2") - elif receiver_type == 3: - if domain != "edu.hse.ru": - self._logger.warning("Wrong domain for student: %s", domain) - else: - del domain - raise ValueError("Wrong receiverType: {}".format(receiver_type)) - - if check_online: - try: - response = self._request( - "schedule", - data={ - 'email': email, - 'fromDate': RUZ.date(), - 'toData': RUZ.date(1) - } - ) - del response - except (error.HTTPError, error.URLError) as excinfo: - self._logger.debug(excinfo) - raise ValueError( - "(online) Wrong HSE email address: {}".format(email) - ) - - @none_safe - @log - def _get(self, endpoint: str, encoding: str="utf-8", safe: bool=True, - **params) -> (list, None): - """ - Return requested data in JSON - - Check request has correct schema. Throw an exception. - - :param endpoint - endpoint for request. - :param encoding - encoding for received data. - :param safe - return empty list even if no data received. - Request params passes throw **params. - - Usage - ----- - >>> isinstance(RUZ()._get("kindOfWorks"), list) - True - """ - self._verify_schema(endpoint, **params) - try: - response = self._request(endpoint, data=params) - return json.loads(response.read().decode(encoding)) - except error.HTTPError as excinfo: - self._logger.error(excinfo) - if safe: - return [] - return - - def _map_schedules(self, key: str, vals: Iterable, - allowed_types: tuple=(str,), **params) -> map: - """ - Return map for fetching schedules with given vals - - Throw an exception. - - :param name - name of API param. - :param vals - values to map with person_lessons. - :param allowed_types - type(s) of values. - - Usage - ----- - >>> isinstance(RUZ()._map_schedules("abc"), map) - True - """ - if not isinstance(key, str): - raise ValueError("Expect str, got: {}".format(type(key))) - if isinstance(vals, allowed_types): - vals = (vals,) - if not isinstance(vals, (tuple, set, list)): - raise ValueError("Expect Iterable or {}, got: {}".format( - allowed_types, type(vals))) - - def func(val: dict, key: str=key, params: dict=params) -> list: - params.update({key: val}) - return self.person_lessons(**params) - return map(func, vals) - - def schedules(self, - emails: Iterable=None, - lecturer_ids: Iterable=None, - auditorium_ids: Iterable=None, - student_ids: Iterable=None, - **params) -> map: - """ - Classes schedule for multiply students/lecturers as generator - - See RUZ::person_lessons for more details. - Throw an exception. - - :param emails - emails on hse.ru (edu.hse.ru for students). - :param lecturer_ids - IDs of teacher. - :param auditorium_ids - IDs of auditorium. - :param student_ids - IDs of student. - - One of the followed required: lecturer_ids, auditorium_ids, - student_ids, emails. - - Usage - ----- - >>> RUZ().schedules(emails=123) - Traceback (most recent call last): - ... - ValueError: Expect Iterable or (,), got: - >>> RUZ().schedules(lecturer_ids='abc') - Traceback (most recent call last): - ... - ValueError: Expect Iterable or (,), got: - """ - # support CamelCase notation - lecturer_ids = params.pop('lecturerOids', lecturer_ids) - auditorium_ids = params.pop('auditoriumOids', auditorium_ids) - student_ids = params.pop('studentOids', student_ids) - if emails: - return self._map_schedules("email", emails, **params) - elif lecturer_ids: - return self._map_schedules("lecturer_id", lecturer_ids, (int,), - **params) - elif auditorium_ids: - return self._map_schedules("auditorium_id", auditorium_ids, (int,), - **params) - elif student_ids: - return self._map_schedules("student_id", student_ids, (int,), - **params) - else: - raise ValueError("One of the followed required: lecturer_ids, " - "auditorium_ids, student_ids, emails") - - def person_lessons(self, - email: str=None, - from_date: str=dt.now().strftime(r'%Y.%m.%d'), - to_date: str=(dt.now() + - td(days=6)).strftime(r'%Y.%m.%d'), - receiver_type: int=None, - lecturer_id: int=None, - auditorium_id: int=None, - student_id: int=None, - **params) -> list: - """ - Return classes schedule (for week by default) - - Automatically choose receiver type from given email address. - If no email provided and auditorium_id is not None, automatically - set receiver type for auditoriums. - - Default values (fromDate, toDate) are set to return schedule for - one week from now. - - :param email - email on hse.ru (edu.hse.ru for students). - :param from_date, required - start of the period YYYY.MM.DD. - :param to_date, required - end of the period YYYY.MM.DD. - :param receiver_type - type of the schedule - (1/2/3 for teacher/auditorium/student). - :param lecturer_id - ID of teacher. - :param auditorium_id - ID of auditorium. - :param student_id - ID of student. - :param check_online :type bool - online verification for email. - :param safe :type bool - return something even if no data received. - - One of the followed required: lecturer_id, auditorium_id, - student_id, email. - - Usage - ----- - >>> assert RUZ().person_lessons("somemail@hse.ru") - """ - if receiver_type is None and email is not None: - if not RUZ.is_student(email): - receiver_type = 1 - elif receiver_type is None and lecturer_id is not None: - receiver_type = 1 - elif receiver_type is None and auditorium_id is not None: - receiver_type = 2 - - # no need to specify receiver type for students explicitly - if receiver_type == 3: - receiver_type = None - - return self._get( - "schedule", - fromDate=params.pop('fromDate', from_date), - toDate=params.pop('toDate', to_date), - email=email, - receiverType=params.pop('receiverType', receiver_type), - lecturerOid=params.pop('lecturerOid', lecturer_id), - auditoriumOid=params.pop('auditoriumOid', auditorium_id), - studentOid=params.pop('studentOid', student_id), - **params - ) - - def groups(self, faculty_id: int=None, **params) -> list: - """ - Return collection of groups - - :param faculty_id - course ID. - - Usage - ----- - >>> assert RUZ().groups() - """ - return self._get( - "groups", - facultyOid=params.pop('facultyOid', faculty_id) - ) - - def staff_of_group(self, group_id: int, **params) -> list: - """ - Return collection of students in group - - :param group_id, required - group' ID. - - Usage - ----- - >>> assert RUZ().staff_of_group(1) - """ - return self._get( - "staffOfGroup", - groupOid=params.pop('groupOid', group_id) - ) - - def streams(self) -> list: - """ - Return collection of study streams - - Cache requested values. - - Usage - ----- - >>> assert RUZ().streams() - """ - return self._get("streams") - - def staff_of_streams(self, stream_id: int, **params) -> list: - """ - Return collection of the groups on study stream - - :param stream_id, required - group' ID. - - Usage - ----- - >>> assert RUZ().staff_of_streams(45771) - """ - return self._get( - "staffOfStreams", - streamOid=params.pop('streamOid', stream_id) - ) - - def lecturers(self, chair_id: int=None, **params) -> list: - """ - Return collection of teachers - - :param chair_id - ID of department. - - Usage - ----- - >>> assert RUZ().lecturers() - """ - return self._get( - "lecturers", - chairOid=params.pop('chairOid', chair_id) - ) - - def auditoriums(self, building_id: int=None, **params) -> list: - """ - Return collection of auditoriums - - :param building_id - ID of building. - - Usage - ----- - >>> assert RUZ().auditoriums() - """ - return self._get( - "auditoriums", - buildingOid=params.pop('buildingOid', building_id) - ) - - @lru_cache(maxsize=1) - def type_of_auditoriums(self) -> list: - """ - Return collection of auditoriums' types - - Cache requested values. - - Usage - ----- - >>> assert RUZ().type_of_auditoriums() - """ - return self._get("typeOfAuditoriums") - - @lru_cache(maxsize=1) - def kind_of_works(self) -> list: - """ - Return collection of classes' types - - Cache requested values. - - Usage - ----- - >>> assert RUZ().kind_of_works() - """ - return self._get("kindOfWorks") - - @lru_cache(maxsize=1) - def buildings(self) -> list: - """ - Return collection of buildings - - Cache requested values. - - Usage - ----- - >>> assert RUZ().buildings() - """ - return self._get("buildings") - - def faculties(self) -> list: - """ - Return collection of learning programs (faculties) - - Usage - ----- - >>> assert RUZ().faculties() - """ - return self._get("faculties") - - def chairs(self, faculty_id: int=None, **params) -> list: - """ - Return collection of departments - - :param faculty_id - ID of course (learning program). - - Usage - ----- - >>> assert RUZ().chairs() - """ - return self._get( - "chairs", - facultyOid=params.pop('facultyOid', faculty_id) - ) - - def sub_groups(self) -> list: - """ - Return collection of subgroups - - Usage - ----- - >>> assert RUZ().sub_groups() - """ - return self._get("subGroups") - - def __bool__(self) -> bool: - return self.ok - - # aliases - def schedule(self, *args, **kwargs) -> list: - """ - Alias for person_lessons method (backward compitability) - - (depreciated) - Uses same args and kwargs as in person_lessons method. - Look for more info in person_lessons help. - - Usage - ----- - >>> assert RUZ().schedule("mysuperawesomeemail@hse.ru") - """ - return self.person_lessons(*args, **kwargs) - - # for CamelCase compatibility - def subGroups(self) -> list: - return self.sub_groups() - subGroups.__doc__ = sub_groups.__doc__ - - def kindOfWorks(self) -> list: - return self.kind_of_works() - kindOfWorks.__doc__ = kind_of_works.__doc__ - - def typeOfAuditoriums(self) -> list: - return self.type_of_auditoriums() - typeOfAuditoriums.__doc__ = type_of_auditoriums.__doc__ - - def staffOfStreams(self, *args, **kwargs) -> list: - return self.staff_of_streams(*args, **kwargs) - staffOfStreams.__doc__ = staff_of_streams.__doc__ - - def staffOfGroup(self, *args, **kwargs) -> list: - return self.staff_of_group(*args, **kwargs) - staffOfGroup.__doc__ = staff_of_group.__doc__ - - def personLessons(self, *args, **kwargs) -> list: - return self.person_lessons(*args, **kwargs) - personLessons.__doc__ = person_lessons.__doc__ - - -if __name__ == "__main__": - import doctest - doctest.testmod() diff --git a/ruz/__init__.py b/ruz/__init__.py index db97cc0..84dae45 100644 --- a/ruz/__init__.py +++ b/ruz/__init__.py @@ -3,17 +3,15 @@ Usage ----- - from ruz import RUZ - api = RUZ() - assert api.v == 2 - assert api.person_lessons("mymail@edu.hse.ru") + import ruz + assert ruz.person_lessons("mymail@edu.hse.ru") """ -import os +from ruz.main import (auditoriums, buildings, chairs, faculties, find_by_str, + get_formated_date, groups, is_hse_email, is_student, + is_valid_hse_email, kind_of_works, lecturers, + person_lessons, schedules, staff_of_group, streams, + sub_groups, type_of_auditoriums) -from ruz.RUZ import RUZ -from ruz.utils import REQUEST_SCHEMA, RESPONSE_SCHEMA, EMAIL_DOMAINS - -__author__ = "hell03end" -__version__ = (1, 1, 0) -__all__ = ("RUZ", "REQUEST_SCHEMA", "RESPONSE_SCHEMA") +__author__ = "Dmitriy Pchelkin | hell03end" +__version__ = (2, 0, 1) diff --git a/ruz/logging.py b/ruz/logging.py new file mode 100644 index 0000000..866cb4a --- /dev/null +++ b/ruz/logging.py @@ -0,0 +1,121 @@ +import logging +import traceback +from collections import Callable +from functools import wraps +from logging import RootLogger + + +# Default logging behavior +logging.basicConfig( + level=logging.WARNING, + format="[%(asctime)s] %(levelname)s " + "[%(name)s.{%(filename)s}.%(funcName)s:%(lineno)d] %(message)s", + datefmt="%H:%M:%S" +) + + +class Log: + """ Context manager for events' logging. Handling (log) exceptions. """ + + def __init__(self, + case_name: str, + level: int=logging.DEBUG, + logger: RootLogger=None, + **kwargs) -> None: + self._name = case_name + self._level = level + self._logger = logger if logger is not None else None + self._enter_msg = kwargs.pop("enter_msg", "ENTERING::") + self._exit_msg = kwargs.pop("exit_msg", "EXITING::") + self._exc_msg = kwargs.pop("exc_msg", "") + self._silent = kwargs.pop("silent", False) + + def _log(self, *message, level: int=None) -> None: + """ log message """ + if level is None: + level = self._level + + if level == logging.ERROR: + if self._logger: + self._logger.error(*message) + else: + logging.error(*message) + elif level > logging.DEBUG: + if self._logger: + self._logger.info(*message) + else: + logging.info(*message) + else: + if self._logger: + self._logger.debug(*message) + else: + logging.debug(*message) + + def __enter__(self) -> object: + """ Returns it's logger instance of self if silent """ + if not self._silent: + self._log("%s%s", self._enter_msg, self._name) + return self + + def __exit__(self, + exc_type: object=None, + exc_val: object=None, + tb: object=None) -> None: + """ Handling (log) exceptions """ + if exc_type is not None: + self._log( + "%s\n%s%s: %s\n", + self._exc_msg, + "\n".join([s.strip(r"\n") for s in traceback.format_tb(tb)]), + exc_type.__name__, + exc_val + ) + + if not self._silent: + self._log("%s%s", self._exit_msg, self._name) + + +class _FuncLog(Log): + """ Context manager for logging function calls """ + + def __init__(self, + case_name: str, + level: int=logging.DEBUG, + logger: RootLogger=None): + super(_FuncLog, self).__init__( + case_name=case_name, + level=level, + enter_msg="===> ", + exit_msg="<--- ", + logger=logger + ) + + +def log(log_result: bool=True, + log_args: bool=True, + log_kwargs: bool=True, + name: str=None, + level: int=logging.DEBUG, + logger: RootLogger=None) -> Callable: + """ Returns decorator for logging function/method behavior """ + def decor(func: Callable) -> Callable: + func_name = name + if not func_name: + func_name = func.__name__ + + @wraps(func) + def wrapper(*args, **kwargs) -> object: + with _FuncLog(func_name, level, logger=logger): + if log_args: + for arg in args: + logging.debug("()::%s", arg) + if log_kwargs: + for key, value in kwargs.items(): + logging.debug(r"{}::%s=%s", key, value) + + result = func(*args, **kwargs) + if log_result: + logging.debug("RETURN(%s)::%s", func_name, result) + return result + return wrapper + return decor diff --git a/ruz/main.py b/ruz/main.py new file mode 100644 index 0000000..b6f52a8 --- /dev/null +++ b/ruz/main.py @@ -0,0 +1,443 @@ +import json +import re +from collections import Callable, Iterable +from datetime import datetime, timedelta +from functools import lru_cache +from urllib import error, parse, request + +from ruz.logging import log, logging +from ruz.schema import REQUEST_SCHEMA, RUZ_API_ENDPOINTS +from ruz.utils import (CHECK_EMAIL_ONLINE, RUZ_API_URL, RUZ_API_V, + USE_NONE_SAFE_VALUES, none_safe) + + +# ===== Common methods ===== + +def is_student(email: str) -> bool or None: + """ + Check email belongs to student + + :param email, required - valid HSE email addres or domain. + + Stutent's domain: @edu.hse.ru + HSE stuff' domain: @hse.ru + """ + email_domain = email.lower().split("@")[-1] + + if email_domain == "edu.hse.ru": + return True + elif email_domain == "hse.ru": + return False + + logging.error("Wrong HSE email domain: '%s'", email_domain) + + +def is_hse_email(email: str) -> bool: + """ + Check email is valid HSE corp. email + + :param email, required - email address to check. + """ + if re.fullmatch(r"^[a-z0-9\._-]{3,}@(edu\.)?hse\.ru$", email.lower()): + return True + logging.debug("Incorrect HSE email '%s'.", email) + return False + + +def get_formated_date(day_bias: int or float=0) -> str: + """ + Return date in RUZ API compatible format + + :param day_bias - number of day from now. + """ + return (datetime.now() + timedelta( + days=float(day_bias) + )).strftime("%Y.%m.%d") + + +@log() +def is_valid_hse_email(email: str) -> bool: + """ + Check email is valid via API endpoint call (schedule) + + :param email - email address to check (for schedules only). + """ + @none_safe() + def request_schedule_api(**params) -> list or dict: + return request.urlopen(make_url( + "schedule", + email=email, + fromDate=get_formated_date(), + toDate=get_formated_date(1), + **params + )) + + email = email.strip().lower() + if not is_hse_email(email): + return False + + try: + response = request_schedule_api( + receiverType=1 if not is_student(email) else None + ) + del response + except (error.HTTPError, error.URLError) as err: + logging.debug("Email '%s' wasn't verified.\n%s", email, err) + return False + return True + + +# ===== Special methods ===== + +@log() +def is_valid_schema(endpoint: str, + check_email_online: bool=CHECK_EMAIL_ONLINE, + **params) -> bool: + """ + Check params fit schema for certain endpoint + + :param endpoint - endpoint for request. + :param check_email_online - use is_valid_hse_email. + :param params - schema params. + """ + + if (endpoint == "schedule" and "lecturerOid" not in params and + "studentOid" not in params and "email" not in params and + "auditoriumOid" not in params): + logging.debug("One of the followed required: lecturer_id, " + "auditorium_id, student_id, email for " + "schedule endpoint.") + return False + + if params.get('email') is not None: + email = params['email'] + if not is_hse_email(email): + del email + return False + elif check_email_online and not is_valid_hse_email(email): + logging.warning("'%s' is not verified by API call.", email) + del email + + endpoint = RUZ_API_ENDPOINTS.get(endpoint) + if endpoint is None: + logging.warning("Can't find endpoint: '%s'.", endpoint) + del endpoint + return False + + schema = REQUEST_SCHEMA[endpoint] + for key, value in params.items(): + if key not in schema: + logging.warning("Can't find '%s' schema param: '%s'", + endpoint, key) + del schema, endpoint + return False + if not isinstance(value, schema[key]): + logging.warning("Expected {} for '{}'::'{}' got: {}", + schema[key], endpoint, key, type(value)) + del schema, endpoint + return False + del schema, endpoint + return True + + +@log() +def make_url(endpoint: str, **params) -> str: + """ + Creates URL for API requests + + :param endpoint - endpoint for request. + :param params - request params. + """ + url = "".join((RUZ_API_URL, RUZ_API_ENDPOINTS[endpoint])) + if params: + return "?".join((url, parse.urlencode(params))) + return url + + +@none_safe() +@log() +def get(endpoint: str, + encoding: str="utf-8", + return_none_safe: bool=USE_NONE_SAFE_VALUES, + **params) -> (list, dict, None): + """ + Return requested data in JSON + + Check request has correct schema. + + :param endpoint - endpoint for request. + :param encoding - encoding for received data. + :param return_none_safe - return empty list on fallback. + :param params - requested params + """ + if not is_valid_schema(endpoint, **params): + return [] if return_none_safe else None + + url = make_url(endpoint, **params) + try: + response = request.urlopen(url) + return json.loads(response.read().decode(encoding)) + except (error.HTTPError, error.URLError) as err: + logging.warning("Can't get '%s'.\n%s", url, err) + return [] if return_none_safe else None + + +# ===== API methods ===== + +def schedules(emails: Iterable=None, + lecturer_ids: Iterable=None, + auditorium_ids: Iterable=None, + student_ids: Iterable=None, + **params) -> map: + """ + Classes schedule for multiply students/lecturers as generator + + See RUZ::person_lessons for more details. + One of the followed required: lecturer_ids, auditorium_ids, + student_ids, emails. Throw an exception. + + :param emails - emails on hse.ru (edu.hse.ru for students). + :param lecturer_ids - IDs of teacher. + :param auditorium_ids - IDs of auditorium. + :param student_ids - IDs of student. + """ + def get_handler(key: str) -> Callable: + def func(val: dict) -> list or dict: + return person_lessons(**{key: val}, **params) + return func + + if emails: + return map(get_handler("email"), emails) + elif lecturer_ids: + return map(get_handler("lecturer_id"), lecturer_ids) + elif auditorium_ids: + return map(get_handler("auditorium_id"), auditorium_ids) + elif student_ids: + return map(get_handler("student_id"), student_ids) + + raise ValueError("One of the followed required: lecturer_ids, " + "auditorium_ids, student_ids, emails") + + +def person_lessons(email: str=None, + from_date: str=get_formated_date(), + to_date: str=get_formated_date(6), # one week + receiver_type: int=None, + lecturer_id: int=None, + auditorium_id: int=None, + student_id: int=None, + **params) -> list: + """ + Return classes schedule (for week by default) + + Automatically choose receiver type from given email address. + There is no need to specify receiver type for students explicitly. + One of the followed required: lecturer_id, auditorium_id, + student_id, email. Throws an exception. + Default values (fromDate, toDate) are set to return schedule for + one week from now. + + :param email - email on hse.ru (edu.hse.ru for students). + :param from_date, required - start of the period YYYY.MM.DD. + :param to_date, required - end of the period YYYY.MM.DD. + :param receiver_type - type of the schedule + (1/2/3 for teacher/auditorium/student). + :param lecturer_id - ID of teacher. + :param auditorium_id - ID of auditorium. + :param student_id - ID of student. + :param check_online :type bool - online verification for email. + :param safe :type bool - return something even if no data received. + """ + if receiver_type is None: + if email is not None and not is_student(email): + logging.debug("Detect lecturer email: '%s'.", email) + receiver_type = 1 + elif lecturer_id is not None: + logging.debug("Detect lecturer %d.", lecturer_id) + receiver_type = 1 + elif auditorium_id is not None: + logging.debug("Detect auditorium %d.", auditorium_id) + receiver_type = 2 + elif receiver_type == 3: + receiver_type = None + + return get( + "schedule", + fromDate=from_date, + toDate=to_date, + email=email, + receiverType=receiver_type, + lecturerOid=lecturer_id, + auditoriumOid=auditorium_id, + studentOid=student_id, + **params + ) + + +def groups(faculty_id: int=None) -> list: + """ + Return collection of groups + + :param faculty_id - course ID. + """ + return get("groups", facultyOid=faculty_id) + + +def staff_of_group(group_id: int) -> list: + """ + Return collection of students in group + + :param group_id, required - group' ID. + """ + return get("staffOfGroup", groupOid=group_id) + + +@lru_cache(maxsize=1) +def streams(reset_cache: bool=False) -> list: + """ + Return collection of study streams + + Cache requested values. + :param reset - use to reset cached value. + """ + return get("streams") + + +def lecturers(chair_id: int=None) -> list: + """ + Return collection of teachers + + :param chair_id - ID of department. + """ + return get("lecturers", chairOid=chair_id) + + +def auditoriums(building_id: int=None) -> list: + """ + Return collection of auditoriums + + :param building_id - ID of building. + """ + return get("auditoriums", buildingOid=building_id) + + +@lru_cache(maxsize=1) +def type_of_auditoriums(reset_cache: bool=False) -> list: + """ + Return collection of auditoriums' types + + Cache requested values. + :param reset - use to reset cached value. + """ + return get("typeOfAuditoriums") + + +@lru_cache(maxsize=1) +def kind_of_works(reset_cache: bool=False) -> list: + """ + Return collection of classes' types + + Cache requested values. + :param reset - use to reset cached value. + """ + return get("kindOfWorks") + + +@lru_cache(maxsize=1) +def buildings(reset_cache: bool=False) -> list: + """ + Return collection of buildings + + Cache requested values. + :param reset - use to reset cached value. + """ + return get("buildings") + + +@lru_cache(maxsize=1) +def faculties(reset_cache: bool=False) -> list: + """ + Return collection of learning programs + + Cache requested values. + :param reset - use to reset cached value. + """ + return get("faculties") + + +def chairs(faculty_id: int=None) -> list: + """ + Return collection of departments + + :param faculty_id - ID of course (learning program). + """ + return get("chairs", facultyOid=faculty_id) + + +@lru_cache(maxsize=1) +def sub_groups(reset_cache: bool=False) -> list: + """ + Return collection of subgroups + + Cache requested values. + :param reset - use to reset cached value. + """ + return get("subGroups") + + +# ===== Additional methods ===== + +def find_by_str(subject: str or Callable, + query: str, + by: str="name", + **params) -> list: + """ + Linear search for subject by given text field (as query) + + Search method is very straightforward. For more complex searches + use custom implementation. + Throws an exception: + * KeyError if no subject found. + * NotImplementedError if method is not implemented for subject. + + :param subject - subject to find: possible variants: + * buildings: 'name', 'address', 'abbr'; + * faculties: 'name', 'institute', 'abbr'; + * sub_groups: 'name', 'group', 'abbr'; + * streams: 'name', 'faculty', 'abbr', 'formOfEducation', 'course'; + * type_of_auditoriums: 'name', 'abbr'; + * kind_of_works: 'name', 'abbr'; + * chairs: 'name', 'faculty', 'abbr'; + * auditoriums: 'number', 'building', 'typeOfAuditorium'; + * lecturers: 'chair', 'fio', 'shortFIO'; + * groups: 'faculty', 'formOfEducation', 'number', 'speciality'; + * staff_of_group: 'fio', 'shortFIO'; + * person_lessons: 'building', 'date', 'beginLesson', 'auditorium', + 'dateOfNest', 'dayOfWeekString', 'detailInfo', 'discipline', + 'disciplineinplan', 'endLesson', 'kindOfWork', 'lecturer', + 'stream'. + :param query - text query to find. + :param by - search field. + """ + SUBJECTS = { + buildings.__name__: buildings, + faculties.__name__: faculties, + sub_groups.__name__: sub_groups, + streams.__name__: streams, + type_of_auditoriums.__name__: type_of_auditoriums, + kind_of_works.__name__: kind_of_works, + chairs.__name__: chairs, + auditoriums.__name__: auditoriums, + lecturers.__name__: lecturers, + groups.__name__: groups, + staff_of_group.__name__: staff_of_group, + person_lessons.__name__: person_lessons + } + + if not isinstance(subject, Callable): + subject = SUBJECTS[subject] + elif subject.__name__ not in SUBJECTS.keys(): + raise NotImplementedError(subject.__name__) + + query = query.strip().lower() + return [el for el in subject(**params) if query in el[by].lower().strip()] diff --git a/ruz/utils/schema.py b/ruz/schema.py similarity index 95% rename from ruz/utils/schema.py rename to ruz/schema.py index 706db6a..023b71b 100644 --- a/ruz/utils/schema.py +++ b/ruz/schema.py @@ -1,4 +1,6 @@ -# collection of API endpoints (and their aliases) +from ruz.utils import RUZ_API_V + +# collection of API endpoints and their aliases RUZ_API_ENDPOINTS = { 'schedule': r"personLessons", 'lessons': r"personLessons", @@ -23,12 +25,13 @@ 'subgroups': r"subGroups", 'sub_groups': r"subGroups" } -RUZ_API2_ENDPOINTS = { - 'schedule': r"timetable/lessons", - 'lessons': r"timetable/lessons", - 'person_lessons': r"timetable/lessons", - 'personLessons': r"timetable/lessons" -} +if RUZ_API_V == 3: + RUZ_API_ENDPOINTS = { + 'schedule': r"timetable/lessons", + 'lessons': r"timetable/lessons", + 'person_lessons': r"timetable/lessons", + 'personLessons': r"timetable/lessons" + } # type rules to make request for API REQUEST_SCHEMA = { diff --git a/ruz/utils.py b/ruz/utils.py new file mode 100644 index 0000000..cb06e99 --- /dev/null +++ b/ruz/utils.py @@ -0,0 +1,36 @@ +import os +from collections import Callable +from functools import wraps + + +# ===== Constants ===== + +RUZ_API_URL = os.environ.get("RUZ_API_URL", + r"http://92.242.58.221/ruzservice.svc/") + +# detect RUZ API version from possible RUZ API URLs +RUZ_API_V = 1 +if RUZ_API_URL == r"http://92.242.58.221/ruzservice.svc/v2/": + RUZ_API_V = 2 +elif RUZ_API_URL == r"https://www.hse.ru/api/": + RUZ_API_V = 3 + +# default values +CHECK_EMAIL_ONLINE = bool(os.environ.get("CHECK_EMAIL_ONLINE", False)) +USE_NONE_SAFE_VALUES = bool(os.environ.get("USE_NONE_SAFE_VALUES", True)) + + +# ===== Decorators ===== + +def none_safe(args: bool=True, kwargs: bool=True) -> Callable: + """ Pass only not None args/kwargs to function """ + def decor(func: Callable) -> Callable: + @wraps(func) + def wrapper(*_args, **_kwargs) -> object: + if args: + _args = [arg for arg in _args if arg is not None] + if kwargs: + _kwargs = {k: v for k, v in _kwargs.items() if v is not None} + return func(*_args, **_kwargs) + return wrapper + return decor diff --git a/ruz/utils/__init__.py b/ruz/utils/__init__.py deleted file mode 100644 index 2e38c9b..0000000 --- a/ruz/utils/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -import os - -from ruz.utils.decorators import none_safe -from ruz.utils.logging import Logger, log -from ruz.utils.schema import (REQUEST_SCHEMA, RESPONSE_SCHEMA, - RUZ_API_ENDPOINTS, RUZ_API2_ENDPOINTS) - - -RUZ_API_URL = r"http://92.242.58.221/ruzservice.svc/" -RUZ_API2_URL = r"https://www.hse.ru/api/" -EMAIL_PATTERN = r"\b[a-zA-Z0-9\._-]{2,}@([a-zA-Z]{2,}\.)?[a-zA-Z]{2,}\.ru\b" -EMAIL_DOMAINS = (r"hse.ru", r"edu.hse.ru") diff --git a/ruz/utils/decorators.py b/ruz/utils/decorators.py deleted file mode 100644 index f4df8b9..0000000 --- a/ruz/utils/decorators.py +++ /dev/null @@ -1,56 +0,0 @@ -from collections import Callable -from functools import wraps - - -def abs_none_safe(func: Callable) -> Callable: - """ - Absolute None safe decorator - - Pass only not None kwargs and also args to function (no None at all) - - :param func, required - wrapped function/method. - - Usage - ----- - @abs_none_safe - def some_func(*args, **kwargs): - pass # all None args/kwargs will be filtered - """ - @wraps(func) - def wrapper(*args, **kwargs) -> object: - params = [] - for arg in args: - if arg is not None: - params.append(arg) - param_dict = {} - for key, value in kwargs.items(): - if value is not None: - param_dict[key] = value - result = func(*params, **param_dict) - return result - wrapper.__doc__ = func.__doc__ - return wrapper - - -def none_safe(func: Callable) -> Callable: - """ - Pass only not None kwargs to function (args can be None) - - :param func, required - wrapped function/method. - - Usage - ----- - @none_safe - def some_func(*args, **kwargs): - pass # only not None kwargs will be passed - """ - @wraps(func) - def wrapper(*args, **kwargs) -> object: - param_dict = {} - for key, value in kwargs.items(): - if value is not None: - param_dict[key] = value - result = func(*args, **param_dict) - return result - wrapper.__doc__ = func.__doc__ - return wrapper diff --git a/ruz/utils/logging.py b/ruz/utils/logging.py deleted file mode 100644 index 5dc3402..0000000 --- a/ruz/utils/logging.py +++ /dev/null @@ -1,81 +0,0 @@ -import logging -import os -from collections import Callable -from functools import wraps - - -LEVELS = { - "TEST": logging.DEBUG, - "PROD": logging.INFO -} - - -def Logger(name: str, level: int=logging.INFO, **kwargs) -> logging.RootLogger: - """ - Creates configured logger - - :param name, required - name for logger. - :param level - logging level. - :param format, str - logging format. - - Usage - ----- - logger = Logger(__name__) - - >>> Logger("some name").info("Hello, world!") - ... - some name - INFO - Hello, world! - >>> Logger() - Traceback (most recent call last): - ... - TypeError: Logger() missing 1 required positional argument: 'name' - >>> Logger(123) - Traceback (most recent call last): - ... - ValueError: Expect str, got: - """ - if not isinstance(name, str): - raise ValueError("Expect str, got: {}".format(type(name))) - logging.basicConfig( - format=kwargs.pop( - "format", - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - ), - level=level - ) - return logging.getLogger(name) - - -def log(func: Callable) -> Callable: - """ - Log function entering, arguments and exiting (to debug) - - :param func, required - wrapped function/method. - - Usage - ----- - @log - def some_func(): - pass - """ - logger = Logger( - name="{}::{}".format(func.__module__, func.__name__), - level=LEVELS[os.environ.get("logging_level", "PROD")] - ) - - @wraps(func) - def wrapper(*args, **kwargs) -> object: - logger.debug("Entering: %s", func.__name__) - for arg in args: - logger.debug("arg::%s", arg) - for key, value in kwargs.items(): - logger.debug("kwarg::%s=%s", key, value) - result = func(*args, **kwargs) - logger.debug("Exiting: %s", func.__name__) - return result - wrapper.__doc__ = func.__doc__ - return wrapper - - -if __name__ == "__main__": - import doctest - doctest.testmod() diff --git a/setup.py b/setup.py index 18b0018..56faeeb 100644 --- a/setup.py +++ b/setup.py @@ -12,30 +12,56 @@ setup( name="hse_ruz", packages=packages, - version="1.1.0", + version="2.0.1", description="Python wrapper for HSE RUZ API", long_description=open(os.path.join(os.path.dirname(__file__), "README.rst")).read(), - author="hell03end", + author="Dmitriy Pchelkin | hell03end", author_email="hell03end@outlook.com", url="https://github.com/hell03end/hse_ruz", keywords="HSE RUZ API", classifiers=[ + # "Development Status :: 1 - Planning", + # "Development Status :: 2 - Pre-Alpha", + # "Development Status :: 3 - Alpha", + "Development Status :: 4 - Beta", + # "Development Status :: 5 - Production/Stable", + # "Development Status :: 6 - Mature", + # "Development Status :: 7 - Inactive", + "Environment :: Console", + "Environment :: Plugins", + # "Intended Audience :: Customer Service", "Intended Audience :: Developers", "Intended Audience :: Education", - "Development Status :: 5 - Production/Stable", + # "Intended Audience :: End Users/Desktop", + # "Intended Audience :: Financial and Insurance Industry", + # "Intended Audience :: Healthcare Industry", + # "Intended Audience :: Information Technology", + # "Intended Audience :: Legal Industry", + # "Intended Audience :: Manufacturing", + # "Intended Audience :: Other Audience", + # "Intended Audience :: Religion", + # "Intended Audience :: Science/Research", + # "Intended Audience :: System Administrators", + # "Intended Audience :: Telecommunications Industry", "License :: OSI Approved :: MIT License", + "Natural Language :: English", + "Natural Language :: Russian", "Operating System :: OS Independent", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.6", + # "Programming Language :: Python :: 3.3", + # "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5", - "Programming Language :: Python :: 3.4", - "Programming Language :: Python :: 3.3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", - "Topic :: Software Development :: Libraries :: Python Modules" + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: Utilities" ], license="MIT License", platforms=["All"], - python_requires=">=3.3" + python_requires=">=3.5" ) diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..e8c25ed 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,28 @@ +""" + Tests for RUZ API python module + + DANGEROUS: trusted fixtures may be deactivated in few years! +""" + +import logging + +logging.basicConfig( + level=logging.DEBUG, + format="[%(asctime)s] %(levelname)s " + "[%(name)s.{%(filename)s}.%(funcName)s:%(lineno)d] %(message)s", + datefmt="%H:%M:%S" +) + +__version__ = (2, 0, 1) + +# ===== Fixtures ===== +TRUSTED_EMAILS = { + 'student': r"dapchelkin@edu.hse.ru", + 'lecturer': r"aromanov@hse.ru" +} +INTRUSTED_EMAILS = { + 'other': r"hell03end@outlook.com", + 'hse': r"hell03end@hse.ru" +} +TRUSTED_GROUP_ID = 7699 +TRUSTED_LECTURER_ID = 6232 diff --git a/tests/test_RUZ.py b/tests/test_RUZ.py index d5170d3..68a3258 100644 --- a/tests/test_RUZ.py +++ b/tests/test_RUZ.py @@ -1,297 +1,129 @@ +""" Minimal tests for ruz functionality """ + import re -from time import sleep import pytest -from ruz import EMAIL_DOMAINS, REQUEST_SCHEMA, RUZ -from ruz.utils import RUZ_API2_ENDPOINTS as RUZ_ENDPOINTS2 -from ruz.utils import RUZ_API2_URL as RUZ_URL2 -from ruz.utils import RUZ_API_ENDPOINTS as RUZ_ENDPOINTS -from ruz.utils import RUZ_API_URL as RUZ_URL -from ruz.utils.logging import Logger - - -class TestRUZ: - def setup_class(self): - self.api = RUZ(strict_v1=True) - self.api2 = RUZ(strict_v1=False) - self._logger = Logger(str(self.__class__)) - - def setup_method(self): - sleep(0.01) - - def _test_simple_endpoint(self, response: object, - resp_type: type=list) -> None: - assert response is not None - assert isinstance(response, resp_type) - del response - - def _test_endpoint_with_kwargs(self, response: tuple, - resp_type: type=(list, None)) -> None: - assert response[0] is not None # full response - assert response[1] is not None # response with arg - assert response[2] is not None # response with duplicated arg - assert isinstance(response[0], resp_type) - assert isinstance(response[1], resp_type) - assert isinstance(response[2], resp_type) - assert len(response[1]) < len(response[0]) - assert response[1] == response[2] - del response +from tests import (INTRUSTED_EMAILS, TRUSTED_EMAILS, TRUSTED_LECTURER_ID, + logging) - def _test_endpoint_with_args(self, response: tuple, - resp_type: type=(list, None)) -> None: - assert response[0] is not None # response with arg - assert response[1] is not None # response with duplicated arg - assert isinstance(response[0], resp_type) - assert isinstance(response[1], resp_type) - assert response[0] == response[1] +import ruz +from ruz.schema import RUZ_API_ENDPOINTS +from ruz.utils import RUZ_API_URL - def test___init__(self): - api = RUZ( - base_url="123", - endpoints={'a': 123}, - schema={'a': int}, - domains=('abc',) - ) - assert api._url == "123" - assert api._endpoints == {'a': 123} - assert api._schema == {'a': int} - assert api._domains == ('abc',) - with pytest.raises(PermissionError) as excinfo: - api = RUZ(base_url=123) - assert excinfo - for key in ("endpoints", "schema", "domains"): - self._logger.debug("key::%s", key) - with pytest.raises(ValueError) as excinfo: - api = RUZ(**{key: 123}) - assert excinfo +# ===== Common methods ===== - def test_ok(self): - assert self.api.ok - assert self.api2.ok +def test_is_student(): + assert not ruz.is_student(TRUSTED_EMAILS['lecturer']) + assert ruz.is_student(TRUSTED_EMAILS['student']) + assert ruz.is_student(INTRUSTED_EMAILS['other']) is None - def test_schema(self): - assert self.api.schema == REQUEST_SCHEMA - assert self.api.schema is not self.api.schema # deepcopy - assert self.api2.schema == REQUEST_SCHEMA - assert self.api2.schema is not self.api.schema - def test_v(self): - assert self.api.v == 1 - assert self.api2.v == 2 +def test_is_hse_email(): + for email in ("somemail@hse.com", "somem@il@edu.hse.ru", + "somemail@google.ru", INTRUSTED_EMAILS['other']): + assert not ruz.is_hse_email(email) + for email in (INTRUSTED_EMAILS['hse'], TRUSTED_EMAILS['student'], + TRUSTED_EMAILS['lecturer']): + assert ruz.is_hse_email(email) - def test_domains(self): - assert self.api.domains == EMAIL_DOMAINS - assert isinstance(self.api.domains, tuple) - assert self.api2.domains == EMAIL_DOMAINS - assert isinstance(self.api2.domains, tuple) - def test_endpoints(self): - assert self.api.endpoints == RUZ_ENDPOINTS - assert self.api.endpoints is not self.api.endpoints - assert self.api2.endpoints == RUZ_ENDPOINTS - assert self.api2.endpoints is not self.api.endpoints +# minimal functionality test +def test_is_valid_hse_email(): + for email in TRUSTED_EMAILS.values(): + assert ruz.is_valid_hse_email(email) + for email in INTRUSTED_EMAILS.values(): + assert not ruz.is_valid_hse_email(email) - def test_is_student(self): - assert not self.api.is_student("somemail@hse.ru") - assert self.api.is_student("somemail@edu.hse.ru") - with pytest.raises(ValueError) as excinfo: - self.api.is_student(123) - assert excinfo - with pytest.raises(ValueError) as excinfo: - self.api.is_student("somemail@gmail.com") - assert excinfo - def test__make_url(self): - for key, endpoint in RUZ_ENDPOINTS.items(): - assert self.api._make_url(key) == RUZ_URL + endpoint - assert self.api._make_url(key, v=2) == RUZ_URL + endpoint - assert self.api2._make_url(key, v=1) == RUZ_URL + endpoint - for key, endpoint in RUZ_ENDPOINTS2.items(): - assert self.api2._make_url(key, v=2) == RUZ_URL2 + endpoint - with pytest.raises(KeyError): - self.api._make_url("") +def test_get_formated_date(): + with pytest.raises(ValueError): + ruz.get_formated_date("x") - # TODO - def test__request(self) -> NotImplemented: - return NotImplemented + for bias in (1, 1.1, 1., "1", "1.1", -1, 0, "-1", -1, "-1.", "01."): + ruz_date = ruz.get_formated_date(bias) + assert ruz_date + assert isinstance(ruz_date, str) + assert len(ruz_date) == 10 + assert re.match(r"[\d]{4}(\.)[\d]{2}\1[\d]{2}", ruz_date) - # TODO: test all cases (type of param, etc.) - def test__verify_schema(self): - with pytest.raises(KeyError) as excinfo: - self.api._verify_schema("") - assert excinfo - for endpoint in RUZ_ENDPOINTS: - if endpoint == 'schedule': - with pytest.raises(ValueError) as excinfo: - self.api._verify_schema(endpoint, email="123") - assert excinfo - with pytest.raises(ValueError) as excinfo: - self.api._verify_schema(endpoint) - assert excinfo - else: - with pytest.raises(KeyError) as excinfo: - self.api._verify_schema(endpoint, tmp=None) - assert excinfo - assert not self.api._verify_schema(endpoint) + assert ruz.get_formated_date(0) > ruz.get_formated_date(-1) + assert ruz.get_formated_date(0) < ruz.get_formated_date(1) + assert ruz.get_formated_date(0) == ruz.get_formated_date(0.000001) - def test_check_email(self): - correct_emails = ("somemail@edu.hse.ru", "somemail@hse.ru") - incorrect_emails = ("somemail@hse.com", "somem@il@edu.hse.ru", - "somemail@google.ru") - for email in incorrect_emails: - with pytest.raises(ValueError) as excinfo: - self.api.check_email(email) - assert excinfo - for email in correct_emails: - assert self.api.check_email(email) is None - def test_date(self): - with pytest.raises(ValueError) as excinfo: - self.api.date(3.14) - assert excinfo - assert isinstance(self.api.date(), str) - assert len(self.api.date()) == 10 - assert re.match(r"[\d]{4}\.[\d]{2}\.[\d]{2}", self.api.date()) - assert self.api.date(-1) < self.api.date() - assert self.api.date(1) > self.api.date(0) +# ===== Special methods ===== - # DANGEROUS: email will be deactivated in few years - def test_verify_email(self): - correct_emails = ("somemail@edu.hse.ru", "somemail@hse.ru") - incorrect_emails = ("somemail@hse.com", "somem@il@edu.hse.ru", - "somemail@google.ru") - # offline - for email in incorrect_emails: - with pytest.raises(ValueError) as excinfo: - self.api.verify_email(email, check_online=False) - assert excinfo - for email in correct_emails: - assert self.api.verify_email(email, check_online=False) is None - with pytest.raises(ValueError) as excinfo: - self.api.verify_email(correct_emails[0], receiver_type=-1, - check_online=False) - assert excinfo - with pytest.raises(ValueError) as excinfo: - self.api.verify_email(correct_emails[0], receiver_type=2, - check_online=False) - assert excinfo - # online - for email in correct_emails: - with pytest.raises(ValueError) as excinfo: - self.api.verify_email(email, check_online=True) - assert excinfo - assert self.api.verify_email("dapchelkin@edu.hse.ru") is None - - # TODO: test all cases (safe key, None kwargs, etc.) - def test__get(self): - incorrect_email = "somemail@hse.com" - for endpoint in RUZ_ENDPOINTS: - if endpoint == "schedule": - with pytest.raises(ValueError) as excinfo: - self.api._get(endpoint, email=incorrect_email) - assert excinfo - else: - with pytest.raises(KeyError) as excinfo: - self.api._get(endpoint, tmp=123) - assert excinfo - assert self.api._get("kind_of_works") - - def test__map_schedules(self): - assert isinstance(self.api._map_schedules("email", ("123",)), map) - assert self.api._map_schedules("email", "123", allowed_types=(str,)) - assert self.api._map_schedules("email", 123, allowed_types=(int,)) - with pytest.raises(ValueError) as excinfo: - self.api._map_schedules("email", "123", allowed_types=(int,)) - assert excinfo - - # TODO: test auditoriums and students - def test_schedules(self): - with pytest.raises(ValueError) as excinfo: - self.api.schedules() - assert excinfo - assert isinstance(self.api.schedules(emails=["abc"]), map) - assert next(self.api.schedules(emails="dapchelkin@edu.hse.ru", - safe=False)) is not None - assert self.api.schedules(lecturer_ids=[6232], safe=False) - - # DANGEROUS: email will be deactivated in few years - # TODO: test auditorium_id, student_id, lecturer_id keys - def test_person_lessons(self): - student_email = "dapchelkin@edu.hse.ru" - lecturer_email = "aromanov@hse.ru" - incorrect_email = "somemail@hse.ru" - self._test_endpoint_with_args(( - self.api.person_lessons(lecturer_email), - self.api.person_lessons(lecturer_email, receiver_type=1) +def test_make_url(): + with pytest.raises(KeyError): + ruz.main.make_url("") + for key, endpoint in RUZ_API_ENDPOINTS.items(): + assert ruz.main.make_url(key) == RUZ_API_URL + endpoint + assert ruz.main.make_url(key, v=1) == "?".join(( + RUZ_API_URL + endpoint, "v=1" )) - assert self.api.person_lessons(student_email) is not None - assert self.api.person_lessons(incorrect_email, safe=True, - check_online=False) is not None - assert self.api.person_lessons(incorrect_email, safe=False, - check_online=False) is None - with pytest.raises(ValueError) as excinfo: - self.api.person_lessons(incorrect_email, check_online=True) - assert excinfo - for lecturer_id in (24577, 19000, 24187, 23867, 22349): - self._test_simple_endpoint( - self.api.person_lessons(lecturer_id=lecturer_id, safe=False) - ) - def test_groups(self): - self._test_endpoint_with_kwargs(( - self.api.groups(), - self.api.groups(faculty_id=5490), - self.api.groups(faculty_id=5577, facultyOid=5490) - )) - def test_staff_of_group(self): - self._test_endpoint_with_args(( - self.api.staff_of_group(group_id=7699), - self.api.staff_of_group(group_id=1, groupOid=7699) - )) - - def test_streams(self): - self._test_simple_endpoint(self.api.streams()) - - def test_staff_of_streams(self): - self._test_endpoint_with_args(( - self.api.staff_of_streams(stream_id=45771), - self.api.staff_of_streams(stream_id=1, streamOid=45771) - )) - - def test_lecturers(self): - self._test_endpoint_with_kwargs(( - self.api.lecturers(), - self.api.lecturers(chair_id=1166), - self.api.lecturers(chair_id=1146, chairOid=1166) - )) +# tests not all cases (type of param, etc.) +def test_is_valid_schema(): + assert not ruz.main.is_valid_schema("") + for endpoint in RUZ_API_ENDPOINTS.keys(): + if endpoint == 'schedule': + assert not ruz.main.is_valid_schema( + endpoint, email=INTRUSTED_EMAILS['other'] + ) + assert not ruz.main.is_valid_schema(endpoint) + for email in TRUSTED_EMAILS.values(): + assert ruz.main.is_valid_schema(endpoint, email=email, + check_email_online=False) + else: + assert not ruz.main.is_valid_schema(endpoint, tmp=None) + assert ruz.main.is_valid_schema(endpoint) + + +# tests only negative cases, positive cases are tested in test_schema +def test_get(): + for endpoint in RUZ_API_ENDPOINTS.keys(): + if endpoint == "schedule": + none_safe = ruz.main.get( + endpoint, + email=INTRUSTED_EMAILS['other'], + return_none_safe=True, + check_email_online=False + ) + usual = ruz.main.get( + endpoint, + email=INTRUSTED_EMAILS['other'], + return_none_safe=False, + check_email_online=False + ) + else: + none_safe = ruz.main.get(endpoint, tmp=123, return_none_safe=True) + usual = ruz.main.get(endpoint, tmp=123, return_none_safe=False) + assert not none_safe + assert isinstance(none_safe, list) + assert not usual + assert usual is None - def test_auditoriums(self): - self._test_endpoint_with_kwargs(( - self.api.auditoriums(), - self.api.auditoriums(building_id=2272), - self.api.auditoriums(building_id=2222, buildingOid=2272) - )) - def test_type_of_auditoriums(self): - self._test_simple_endpoint(self.api.type_of_auditoriums()) +# ===== API methods ===== - def test_kind_of_works(self): - self._test_simple_endpoint(self.api.kind_of_works()) +def test_schedules(): + with pytest.raises(ValueError) as excinfo: + ruz.schedules() + assert excinfo - def test_buildings(self): - self._test_simple_endpoint(self.api.buildings()) + for schedule in ruz.schedules(emails=TRUSTED_EMAILS.values(), + return_none_safe=False): + assert schedule is not None - def test_faculties(self): - self._test_simple_endpoint(self.api.faculties()) + schedule_map = ruz.schedules(lecturer_ids=[TRUSTED_LECTURER_ID], + return_none_safe=False) + assert isinstance(schedule_map, map) - def test_chairs(self): - self._test_endpoint_with_kwargs(( - self.api.chairs(), - self.api.chairs(faculty_id=5490), - self.api.chairs(faculty_id=5577, facultyOid=5490) - )) - def test_sub_groups(self): - self._test_simple_endpoint(self.api.sub_groups()) +def test_find_by_str(): + with pytest.raises(KeyError): + ruz.find_by_str("tmp", "some query") + with pytest.raises(NotImplementedError): + ruz.find_by_str(min, "some query") diff --git a/tests/test_schema.py b/tests/test_schema.py index 67692c9..e364734 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -1,127 +1,92 @@ -""" Test current response schema is still actual """ - -from ruz import RUZ -from ruz.utils import RESPONSE_SCHEMA -from ruz.utils.logging import Logger - - -class TestResponseSchema: - def setup_class(self): - self.api = RUZ(strict_v1=True) - self.api2 = RUZ(strict_v1=False) - self._logger = Logger(str(self.__class__)) - - def _test_schema(self, schema, response): - if isinstance(schema, type): - assert isinstance(response, schema) - else: - assert isinstance(response, type(schema)) - - if response and isinstance(response, list): - schema = schema[0] # schema describe only one element - for element in response: - assert isinstance(element, type(schema)) - for key, value in element.items(): - try: - assert isinstance(value, schema[key]) - except AssertionError as err: - self._logger.debug("param::%s", key) - raise err - # check missing keys (difference between schema and response) - if element.keys() != schema.keys(): - if len(element.keys()) > len(schema.keys()): - missed_keys = set(element.keys()) - set(schema.keys()) - else: - missed_keys = set(schema.keys()) - set(element.keys()) - self._logger.warning("missed keys::%s", missed_keys) - elif response and isinstance(response, dict): - for key, value in response.items(): - self._test_schema(schema[key], value) - - # DANGEROUS: email will be deactivated in few years - def test_schedule(self): - self._test_schema( - schema=RESPONSE_SCHEMA['schedule'], - response=self.api.person_lessons(email="dapchelkin@edu.hse.ru") - ) - - # DANGEROUS: email will be deactivated in few years - def test_schedule2(self) -> NotImplemented: - self._test_schema( - schema=RESPONSE_SCHEMA['schedule2'], - response=self.api2.person_lessons(email="dapchelkin@edu.hse.ru") - ) - - def test_groups(self): - self._test_schema( - schema=RESPONSE_SCHEMA['groups'], - response=self.api.groups() - ) - - # DANGEROUS: group id may be deactivated in few years - def test_staffOfGroup(self): - self._test_schema( - schema=RESPONSE_SCHEMA['staffOfGroup'], - response=self.api.staff_of_group(group_id=7699) - ) - - def test_streams(self): - self._test_schema( - schema=RESPONSE_SCHEMA['streams'], - response=self.api.streams() - ) - - # DANGEROUS: stream id may be deactivated in few years - def test_staffOfStreams(self): - self._test_schema( - schema=RESPONSE_SCHEMA['staffOfStreams'], - response=self.api.staff_of_streams(stream_id=0) - ) - - def test_lecturers(self): - self._test_schema( - schema=RESPONSE_SCHEMA['lecturers'], - response=self.api.lecturers() - ) - - def test_auditoriums(self): - self._test_schema( - schema=RESPONSE_SCHEMA['auditoriums'], - response=self.api.auditoriums() - ) - - def test_typeOfAuditoriums(self): - self._test_schema( - schema=RESPONSE_SCHEMA['typeOfAuditoriums'], - response=self.api.type_of_auditoriums() - ) - - def test_kindOfWorks(self): - self._test_schema( - schema=RESPONSE_SCHEMA['kindOfWorks'], - response=self.api.kind_of_works() - ) - - def test_buildings(self): - self._test_schema( - schema=RESPONSE_SCHEMA['buildings'], - response=self.api.buildings() - ) - - def test_faculties(self): - self._test_schema( - schema=RESPONSE_SCHEMA['faculties'], - response=self.api.faculties() - ) - - def test_chairs(self): - self._test_schema( - schema=RESPONSE_SCHEMA['chairs'], - response=self.api.chairs() - ) - - def test_subGroups(self): - self._test_schema( - schema=RESPONSE_SCHEMA['subGroups'], - response=self.api.sub_groups() - ) +""" Test that current response schema is still actual """ + +from tests import TRUSTED_EMAILS, TRUSTED_GROUP_ID, logging + +import ruz +from ruz.schema import RESPONSE_SCHEMA + + +def _test_schema(schema, response): + if isinstance(schema, type): + assert isinstance(response, schema) + else: + assert isinstance(response, type(schema)) + + if response and isinstance(response, list): + schema = schema[0] # schema describe only one element + for element in response: + assert isinstance(element, type(schema)) + for key, value in element.items(): + try: + assert isinstance(value, schema[key]) + except AssertionError as err: + logging.debug("param::%s", key) + raise err + # check missing keys (difference between schema and response) + if element.keys() != schema.keys(): + if len(element.keys()) > len(schema.keys()): + missed_keys = set(element.keys()) - set(schema.keys()) + else: + missed_keys = set(schema.keys()) - set(element.keys()) + logging.warning("missed keys::%s", missed_keys) + elif response and isinstance(response, dict): + for key, value in response.items(): + _test_schema(schema[key], value) + + +# ===== API methods ===== + +def test_schedule(): + logging.warning("Trusted emails may be deactivated in few years!") + _test_schema(schema=RESPONSE_SCHEMA['schedule'], + response=ruz.person_lessons(email=TRUSTED_EMAILS['student'])) + + +def test_groups(): + _test_schema(schema=RESPONSE_SCHEMA['groups'], response=ruz.groups()) + + +def test_staffOfGroup(): + logging.warning("Group id may be deactivated in few years!") + _test_schema(schema=RESPONSE_SCHEMA['staffOfGroup'], + response=ruz.staff_of_group(group_id=TRUSTED_GROUP_ID)) + + +def test_streams(): + _test_schema(schema=RESPONSE_SCHEMA['streams'], response=ruz.streams()) + + +def test_lecturers(): + _test_schema(schema=RESPONSE_SCHEMA['lecturers'], response=ruz.lecturers()) + + +def test_auditoriums(): + _test_schema(schema=RESPONSE_SCHEMA['auditoriums'], + response=ruz.auditoriums()) + + +def test_typeOfAuditoriums(): + _test_schema(schema=RESPONSE_SCHEMA['typeOfAuditoriums'], + response=ruz.type_of_auditoriums()) + + +def test_kindOfWorks(): + _test_schema(schema=RESPONSE_SCHEMA['kindOfWorks'], + response=ruz.kind_of_works()) + + +def test_buildings(): + _test_schema(schema=RESPONSE_SCHEMA['buildings'], response=ruz.buildings()) + + +def test_faculties(): + _test_schema(schema=RESPONSE_SCHEMA['faculties'], response=ruz.faculties()) + + +def test_chairs(): + _test_schema(schema=RESPONSE_SCHEMA['chairs'], response=ruz.chairs()) + + +def test_subGroups(): + _test_schema(schema=RESPONSE_SCHEMA['subGroups'], + response=ruz.sub_groups()) diff --git a/tests/test_utils.py b/tests/test_utils.py deleted file mode 100644 index 4027fc7..0000000 --- a/tests/test_utils.py +++ /dev/null @@ -1,40 +0,0 @@ -from ruz.utils.decorators import abs_none_safe, none_safe -from ruz.utils.logging import log - -ARGS = (1, 2, None, 4, None) -SAFE_ARGS = (1, 2, 4) -KWARGS = {'a': 1, 'b': 2, 'c': None, 'd': 4, 'e': None} -SAFE_KWARGS = {'a': 1, 'b': 2, 'd': 4} - - -@log -def func_log(*args, **kwargs) -> tuple: - """ Fixture for test_log """ - return args, kwargs - - -@abs_none_safe -def func_abs_none_safe(*args, **kwargs) -> tuple: - """ Fixture for test_abs_none_safe """ - return args, kwargs - - -@none_safe -def func_none_safe(*args, **kwargs) -> tuple: - """ Fixture for test_none_safe """ - return args, kwargs - - -def test_log(): - assert func_log(*ARGS, **KWARGS) == (ARGS, KWARGS) - assert func_log.__doc__ == " Fixture for test_log " - - -def test_abs_none_safe(): - assert func_abs_none_safe(*ARGS, **KWARGS) == (SAFE_ARGS, SAFE_KWARGS) - assert func_abs_none_safe.__doc__ == " Fixture for test_abs_none_safe " - - -def test_none_safe(): - assert func_none_safe(*ARGS, **KWARGS) == (ARGS, SAFE_KWARGS) - assert func_none_safe.__doc__ == " Fixture for test_none_safe "