diff --git a/makefile b/Makefile similarity index 100% rename from makefile rename to Makefile diff --git a/README.md b/README.md index a276a19..d98481f 100644 --- a/README.md +++ b/README.md @@ -48,83 +48,94 @@ Install the cli `whois` of your operating system if it is not present already Install `whois` package from your distribution (e.g apt install whois) -``` -$pip install whois ->>> import whois ->>> domain = whois.query('google.com') + $pip install whois ->>> print(domain.__dict__) -{ - 'expiration_date': datetime.datetime(2020, 9, 14, 0, 0), - 'last_updated': datetime.datetime(2011, 7, 20, 0, 0), - 'registrar': 'MARKMONITOR INC.', - 'name': 'google.com', - 'creation_date': datetime.datetime(1997, 9, 15, 0, 0) -} + >>> import whois + >>> domain = whois.query('google.com') ->>> print(domain.name) -google.com + >>> print(domain.__dict__) + { + 'expiration_date': datetime.datetime(2020, 9, 14, 0, 0), + 'last_updated': datetime.datetime(2011, 7, 20, 0, 0), + 'registrar': 'MARKMONITOR INC.', + 'name': 'google.com', + 'creation_date': datetime.datetime(1997, 9, 15, 0, 0) + } ->>> print(domain.expiration_date) -2020-09-14 00:00:00 -``` + >>> print(domain.name) + google.com + + >>> print(domain.expiration_date) + 2020-09-14 00:00:00 ## ccTLD & TLD support see the file: ./whois/tld_regexpr.py or call whois.validTlds() ## Issues -Raise an issue https://github.com/DannyCork/python-whois/issues/new + * Raise an issue https://github.com/DannyCork/python-whois/issues/new -## Changes: -2022-06-09: maarten_boot: +## Changes: 2022-06-09: maarten_boot: * the returned list of name_servers is now a sorted unique list and not a set * the help function whois.validTlds() now outputs the true tld with dots -2022-09-27: maarten_boot +## 2022-09-27: maarten_boot * add test2.py to replace test.py * ./test2.py -h will show the possible usage * all tests from the original program are now files in the ./tests directory * test can be done on all supported tld's with -a or --all and limitest by regex with -r or --reg= -2022-11-04: maarten_boot +## 2022-11-04: maarten_boot * add support for Iana example.com, example.net -2022-11-07: maarten_boot +## 2022-11-07: maarten_boot * add testing against static known data in dir: ./testdata//output * test.sh will test all domains in testdata without actually calling whois, the input data is instead read from testdata//input -2022-11-11: maarten_boot +## 2022-11-11: maarten_boot + * add support for returning the raw data from the whois command: flag include_raw_whois_text * add support for handling unsupported domains via whois raw text only: flag return_raw_text_for_unsupported_tld -2023-01-18: sorrowless +## 2023-01-18: sorrowless + * add an opportunity to specify maximum cache age -2023-01-25: maarten_boot +## 2023-01-25: maarten_boot + * convert the tld file to a Dict, we now no longer need a mappper for python keywords or second level domains. * utf8 level domains also need no mapper anymore an can be added as is with a translation to xn-- * added xn-- tlds for all known utf-8 domains we currently have * we can now add new domains on the fly or change them: whois.mergeExternalDictWithRegex(aDictToOverride) see example exampleExtend.py -2023-01-27: maarten_boot +## 2023-01-27: maarten_boot + * add autodetect via iana tld file (this has only tld's) * add a central collection of all compiled regexes and reuse them: REG_COLLECTION_BY_KEY in _0_init_tld.py * refresh testdata now that tld has dot instead of _ if more then one level * add additional strings meaning domain does not exist -2023-02-02: maarten_boot +## 2023-02-02: maarten_boot + * whois.QuotaStringsAdd(str) to add additional strings for over quota detection. whois.QuotaStrings() lists the current configured strings * whois.NoneStringsAdd(str) to add additional string for NoSuchDomainExists detection (whois.query() retuning None). whois.NoneStrings() lsts the current configured strings * suppress messages to stderr if not verbose=True -2023-07-20: maarten_boot -* sync with https://github.com/mboot-github/WhoisDomain; 1.20230720.1; (gov.tr), (com.ru, msk.ru, spb.ru), (option to preserve partial output after timeout) -* sync with https://github.com/mboot-github/WhoisDomain; 1.20230720.2; add t_test hint support; fix some server hints +## 2023-07-20: maarten_boot + + * sync with https://github.com/mboot-github/WhoisDomain; 1.20230720.1; (gov.tr), (com.ru, msk.ru, spb.ru), (option to preserve partial output after timeout) + * sync with https://github.com/mboot-github/WhoisDomain; 1.20230720.2; add t_test hint support; fix some server hints + +## 2023-08-21: mboot-github (maarten_boot) + + * abandon any python below 3.9 (mypy compatibilities) + * major refactor into more object based approch and parameterContext + * allow custom caching backends (e.g. redis, dbm, ...) -2023-08-21: mboot-github (maarten_boot) -* abandon any python below 3.9 (mypy compatibilities) -* major refactor into more object based approcj and paramaterContext -* allow custom caching backends (e.g. redis, dbm, ...) +## 2023-09-22 see new paramaters in whois/context/parameterContext.oy + * Sync with latest whoisdomain + * Allow cleaning up the http(s) info in the status response. + * Allow correlation with tld (pip install tld) public_suffix. + * Allow display of what whois-servers were used until we reach the final item. diff --git a/whois/__init__.py b/whois/__init__.py index 7a1230e..9bc99c5 100755 --- a/whois/__init__.py +++ b/whois/__init__.py @@ -1,5 +1,17 @@ +# pylint: disable=duplicate-code +""" +Module providing all public accessible functions and data for the whoisdomain package + +## optional modules supported: + +- if the tld library is installed you can use the `withPublicSuffix:bool` option + +All public data is vizible via the __all__ List +""" + import sys import os +import logging from functools import wraps @@ -56,12 +68,15 @@ initLastWhois, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + HAS_REDIS = False try: import redis HAS_REDIS = True -except Exception as e: +except ImportError as e: _ = e if HAS_REDIS: @@ -82,7 +97,7 @@ import tld as libTld TLD_LIB_PRESENT = True -except Exception as e: +except ImportError as e: _ = e # ignore any error __all__ = [ @@ -196,6 +211,8 @@ def query( tryInstallMissingWhoisOnWindows: bool = False, shortResponseLen: int = 5, withPublicSuffix: bool = False, + extractServers: bool = False, + stripHttpStatus: bool = False, # if you use pc as argument all above params (except domain are ignored) ) -> Optional[Domain]: # see documentation about paramaters in parameterContext.py @@ -223,10 +240,12 @@ def query( withPublicSuffix=withPublicSuffix, shortResponseLen=shortResponseLen, tryInstallMissingWhoisOnWindows=tryInstallMissingWhoisOnWindows, + extractServers=extractServers, + stripHttpStatus=stripHttpStatus, ) - if verbose: - print(pc, file=sys.stderr) + msg = f"{pc}" + log.debug(msg) return q2(domain=domain, pc=pc) diff --git a/whois/cache/dbmCache.py b/whois/cache/dbmCache.py index 321cfcd..4d14770 100644 --- a/whois/cache/dbmCache.py +++ b/whois/cache/dbmCache.py @@ -1,10 +1,15 @@ -import sys +# import sys import dbm +import os +import logging from typing import ( Optional, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + class DBMCache: def __init__( @@ -14,22 +19,20 @@ def __init__( ) -> None: self.verbose = verbose self.dbmFile = dbmFile - if self.verbose: - print(f"{type(self).__name__} verbose: {self.verbose}", file=sys.stderr) def get( self, keyString: str, ) -> Optional[str]: - if self.verbose: - print(f"{type(self).__name__} get: {keyString}", file=sys.stderr) + msg = f"{type(self).__name__} get: {keyString}" + log.debug(msg) with dbm.open(self.dbmFile, "c") as db: data = db.get(keyString, None) if data: sdata: str = data.decode("utf-8") - if self.verbose: - print(sdata, file=sys.stderr) + msg = f"{sdata}" + log.debug(msg) return sdata return None @@ -38,8 +41,8 @@ def put( keyString: str, data: str, ) -> str: - if self.verbose: - print(f"{type(self).__name__} put: {keyString}", file=sys.stderr) + msg = f"{type(self).__name__} put: {keyString}" + log.debug(msg) with dbm.open(self.dbmFile, "c") as db: db[keyString] = bytes(data, "utf-8") diff --git a/whois/cache/dummyCache.py b/whois/cache/dummyCache.py index a3da989..e3c10bc 100644 --- a/whois/cache/dummyCache.py +++ b/whois/cache/dummyCache.py @@ -1,9 +1,14 @@ -import sys +# import sys +import os +import logging from typing import ( Optional, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + class DummyCache: def __init__( @@ -11,15 +16,11 @@ def __init__( verbose: bool = False, ) -> None: self.verbose = verbose - if self.verbose: - print(f"{type(self).__name__} verbose: {self.verbose}", file=sys.stderr) def get( self, keyString: str, ) -> Optional[str]: - if self.verbose: - print(f"{type(self).__name__} get: {keyString}", file=sys.stderr) return None def put( @@ -27,6 +28,4 @@ def put( keyString: str, data: str, ) -> str: - if self.verbose: - print(f"{type(self).__name__} put: {keyString}", file=sys.stderr) return data diff --git a/whois/cache/redisCache.py b/whois/cache/redisCache.py index 75fb6a6..ee6d31a 100644 --- a/whois/cache/redisCache.py +++ b/whois/cache/redisCache.py @@ -1,16 +1,23 @@ #! /usr/bin/env python3 +# pylint: disable=duplicate-code +# pylint disable=broad-exception-caught + +import os +import logging -import sys from typing import ( Optional, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + HAS_REDIS = False try: import redis HAS_REDIS = True -except Exception as e: +except ImportError as e: _ = e if HAS_REDIS: @@ -28,21 +35,14 @@ def __init__(self, verbose: bool = False, host: str = "localhost", port: int = 6 connection_pool=self.pool, ) - if self.verbose: - print(f"{type(self).__name__} verbose: {self.verbose}", file=sys.stderr) - def get( self, keyString: str, ) -> Optional[str]: - if self.verbose: - print(f"{type(self).__name__} get: {keyString}", file=sys.stderr) data = self.redis.get(keyString) if data: sdata: str = data.decode("utf-8") - if self.verbose: - print(sdata, file=sys.stderr) return sdata return None @@ -51,8 +51,6 @@ def put( keyString: str, data: str, ) -> str: - if self.verbose: - print(f"{type(self).__name__} put: {keyString}", file=sys.stderr) self.redis.set( keyString, diff --git a/whois/cache/simpleCacheBase.py b/whois/cache/simpleCacheBase.py index dec423e..9db2d4d 100755 --- a/whois/cache/simpleCacheBase.py +++ b/whois/cache/simpleCacheBase.py @@ -1,8 +1,10 @@ #! /usr/bin/env python3 import time -import sys +# import sys +import os +import logging from typing import ( Dict, @@ -10,6 +12,9 @@ Tuple, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + class SimpleCacheBase: def __init__( @@ -21,16 +26,11 @@ def __init__( self.memCache: Dict[str, Tuple[float, str]] = {} self.cacheMaxAge: int = cacheMaxAge - if self.verbose: - print("DEBUG cache init SimpleCacheBase", file=sys.stderr) - def put( self, keyString: str, data: str, ) -> str: - if self.verbose: - print(f"DEBUG: cache put: {keyString}", file=sys.stderr) # store the currentTime and data tuple (time, data) self.memCache[keyString] = ( @@ -43,23 +43,14 @@ def get( self, keyString: str, ) -> Optional[str]: - if self.verbose: - print(f"DEBUG cache get: {keyString}", file=sys.stderr) cData = self.memCache.get(keyString) if cData is None: - if self.verbose: - print("get: no data", file=sys.stderr) return None t = time.time() hasExpired = cData[0] < (t - self.cacheMaxAge) if hasExpired is True: - if self.verbose: - print( - f"DEBUG: cache get: data has expired {keyString} {cData[0]}, {t}, {self.cacheMaxAge}", - file=sys.stderr, - ) return None return cData[1] diff --git a/whois/cache/simpleCacheWithFile.py b/whois/cache/simpleCacheWithFile.py index c124287..74d29f8 100755 --- a/whois/cache/simpleCacheWithFile.py +++ b/whois/cache/simpleCacheWithFile.py @@ -1,19 +1,20 @@ #! /usr/bin/env python3 -import sys import os import json - +import logging from typing import ( Optional, - # Tuple, ) from .simpleCacheBase import ( SimpleCacheBase, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + class SimpleCacheWithFile(SimpleCacheBase): cacheFilePath: Optional[str] = None @@ -26,8 +27,6 @@ def __init__( ) -> None: super().__init__(verbose=verbose, cacheMaxAge=cacheMaxAge) self.cacheFilePath = cacheFilePath - if self.verbose: - print("init SimpleCacheWithFile", file=sys.stderr) def _fileLoad( self, @@ -38,14 +37,12 @@ def _fileLoad( if not os.path.isfile(self.cacheFilePath): return - if self.verbose: - print(f"fileLoad: {self.cacheFilePath}", file=sys.stderr) - - with open(self.cacheFilePath, "r") as f: + with open(self.cacheFilePath, "r", encoding="utf-8") as f: try: self.memCache = json.load(f) - except Exception as e: - print(f"ignore json load err: {e}", file=sys.stderr) + except ValueError as e: + msg = f"ignore json load err: {e}" + log.error(msg) def _fileSave( self, @@ -53,10 +50,7 @@ def _fileSave( if self.cacheFilePath is None: return - if self.verbose: - print(f"_fileSave: {self.cacheFilePath}", file=sys.stderr) - - with open(self.cacheFilePath, "w") as f: + with open(self.cacheFilePath, "w", encoding="utf-8") as f: json.dump(self.memCache, f) def put( diff --git a/whois/context/dataContext.py b/whois/context/dataContext.py index fb1a2fd..4dcb738 100644 --- a/whois/context/dataContext.py +++ b/whois/context/dataContext.py @@ -1,5 +1,8 @@ #! /usr/bin/env python3 +import os +import logging + from typing import ( List, Dict, @@ -7,6 +10,9 @@ Optional, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + class DataContext: def __init__( @@ -32,3 +38,4 @@ def __init__( self.data: Dict[str, Any] = {} # the data we need to build the domain object self.exeptionStr: Optional[str] = None # if we handle exceptions as result string instead of throw self.thisTld: Dict[str, Any] = {} # the dict of regex and info as defined by ZZ and parsed by TldInfo + self.servers: List[str] = [] # extract whois servers from the whois output (may need --verbose on rfc1036/whois) diff --git a/whois/context/parameterContext.py b/whois/context/parameterContext.py index 8a8f48c..db72e62 100755 --- a/whois/context/parameterContext.py +++ b/whois/context/parameterContext.py @@ -1,5 +1,7 @@ #! /usr/bin/env python3 +import os +import logging import json from typing import ( @@ -8,6 +10,9 @@ Any, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + ParamsStringJson: str = """ { "ignore_returncode": { @@ -123,6 +128,18 @@ "default": false, "optional": true, "help": "if lib 'tld' is installed add tld info based on get_tld(); fake the tld if needed" + }, + "extractServers": { + "type": "bool", + "default": false, + "optional": true, + "help": "try to extract the whois servers from the whois output (uses --verbose)" + }, + "stripHttpStatus": { + "type": "bool", + "default": false, + "optional": true, + "help": "strip https://icann.org/epp# from status response" } } """ @@ -180,7 +197,7 @@ def validateAllMandatoryNowKnown( ) -> None: if len(mandatory) != 0: msg = f"missing mandatory parametrs: {sorted(mandatory)}" - raise Exception(msg) + raise ValueError(msg) def __init__( self, diff --git a/whois/doWhoisCommand.py b/whois/doWhoisCommand.py index bd7eb16..d5ae8d7 100755 --- a/whois/doWhoisCommand.py +++ b/whois/doWhoisCommand.py @@ -1,6 +1,8 @@ #! /usr/bin/env python3 -import sys +# import sys +import os +import logging from typing import ( Optional, @@ -12,6 +14,9 @@ from .context.parameterContext import ParameterContext from .context.dataContext import DataContext +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + # actually also whois uses cache, so if you really dont want to use cache # you should also pass the --force-lookup flag (on linux) @@ -34,13 +39,13 @@ def _initDefaultCache( ) -> Any: global CACHE_STUB - if pc.verbose: - print(f"DEBUG: CACHE_STUB {CACHE_STUB}", file=sys.stderr) + msg = f"CACHE_STUB {CACHE_STUB}" + log.debug(msg) # here you can override caching, if someone else already defined CACHE_STUB by this time, we use their caching if CACHE_STUB: - if pc.verbose: - print("DEBUG: cache already initialized", file=sys.stderr) + msg = "cache already initialized" + log.debug(msg) return CACHE_STUB # if no cache defined init the default cache (optional with file storage based on pc) @@ -50,8 +55,8 @@ def _initDefaultCache( cacheMaxAge=pc.cache_age, ) - if pc.verbose: - print("DEBUG: initializing default cache", file=sys.stderr) + msg = "initializing default cache" + log.debug(msg) return CACHE_STUB diff --git a/whois/domain.py b/whois/domain.py index 399098d..dd830a8 100755 --- a/whois/domain.py +++ b/whois/domain.py @@ -1,6 +1,9 @@ #! /usr/bin/env python3 -import sys +# import sys +import os +import re +import logging from typing import ( Any, @@ -9,43 +12,45 @@ ) from .handleDateStrings import str_to_date - from .context.parameterContext import ParameterContext from .context.dataContext import DataContext -""" -whoisdomain/domain.py:17:0: R0902: Too many instance attributes (20/7) (too-many-instance-attributes) -whoisdomain/domain.py:66:16: R1718: Consider using a set comprehension (consider-using-set-comprehension) -whoisdomain/domain.py:104:20: R1718: Consider using a set comprehension (consider-using-set-comprehension) -whoisdomain/domain.py:41:8: W0201: Attribute 'name_servers' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:53:8: W0201: Attribute 'name_servers' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:59:8: W0201: Attribute 'status' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:64:8: W0201: Attribute 'statuses' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:72:12: W0201: Attribute 'statuses' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:81:12: W0201: Attribute 'owner' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:84:12: W0201: Attribute 'abuse_contact' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:87:12: W0201: Attribute 'reseller' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:91:16: W0201: Attribute 'registrant' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:93:16: W0201: Attribute 'registrant' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:96:12: W0201: Attribute 'admin' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:102:12: W0201: Attribute 'emails' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:110:16: W0201: Attribute 'emails' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:118:8: W0201: Attribute 'registrar' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:119:8: W0201: Attribute 'registrant_country' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:122:8: W0201: Attribute 'creation_date' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:123:8: W0201: Attribute 'expiration_date' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:124:8: W0201: Attribute 'last_updated' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:126:8: W0201: Attribute 'dnssec' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:147:12: W0201: Attribute 'text' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:150:12: W0201: Attribute '_exception' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:161:12: W0201: Attribute 'name' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:165:12: W0201: Attribute 'tld' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:168:12: W0201: Attribute 'public_suffix' defined outside __init__ (attribute-defined-outside-init) -whoisdomain/domain.py:17:0: R0903: Too few public methods (1/2) (too-few-public-methods) -""" +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) class Domain: + # The docstrings for classes should summarize its behavior + # and list the public methods and instance variables. + """ + A class to represent a standarized result of a whois lookup + + # Attributes + * Attributes are created dynamically, + not all domains have currently the same amount. + + - name: str, the domain name + - tld: str, the detected top level domain + - name_servers: List, a list of detected name servers + - DNSSEC: boolean + + - status: List + - registrar: str + - registrant: str + - registrant_country: + - emails: List + + - updated_date: datetime + - expiration_date: datetime + - creation_date: datetime + + Methods + ------- + def init(pc: ParameterContext,dc: DataContext) -> None: + initialize the object with the current data from dc.data: Dict[str, Any] + the init is separated from creating an instance as we want to use dependency injection as much as possible. + """ + def _cleanupArray( self, data: List[str], @@ -57,10 +62,11 @@ def _cleanupArray( def _doNameservers( self, - data: Dict[str, Any], + pc: ParameterContext, + dc: DataContext, ) -> None: tmp: List[str] = [] - for x in data["name_servers"]: + for x in dc.data["name_servers"]: if isinstance(x, str): tmp.append(x.strip().lower()) continue @@ -83,25 +89,45 @@ def _doNameservers( self.name_servers = sorted(self.name_servers) + def cleanStatus(self, item: str) -> str: + if "icann.org/epp#" in item: + res = re.split(r"\s*\(?https?://(www\.)?icann\.org/epp#\s*", item) + if res and res[0]: + return res[0].strip() + + if "identitydigital.au/get-au/whois-status-codes#" in item: + res = re.split(r"\s*https://identitydigital\.au/get-au/whois-status-codes#\s*", item) + if res and res[0]: + return res[0].strip() + + return item + def _doStatus( self, - data: Dict[str, Any], + pc: ParameterContext, + dc: DataContext, ) -> None: - self.status = data["status"][0].strip() + self.status = dc.data["status"][0].strip() + + if pc.stripHttpStatus: + self.status = self.cleanStatus(self.status) # sorted added to get predictable output during test - # list(set(...))) to deduplicate results + # deduplicate results with set comprehension {} self.statuses = sorted( - list( - set( - [s.strip() for s in data["status"]], - ), - ), + list({s.strip() for s in dc.data["status"]}), ) if "" in self.statuses: self.statuses = self._cleanupArray(self.statuses) + if pc.stripHttpStatus: + z = [] + for item in self.statuses: + item = self.cleanStatus(item) + z.append(item) + self.statuses = z + def _doOptionalFields( self, data: Dict[str, Any], @@ -131,17 +157,14 @@ def _doOptionalFields( # list(set(...))) to deduplicate results self.emails = sorted( - list( - set( - [s.strip() for s in data["emails"]], - ), - ), + list({s.strip() for s in data["emails"]}), ) if "" in self.emails: self.emails = self._cleanupArray(self.emails) def _parseData( self, + pc: ParameterContext, dc: DataContext, ) -> None: # process mandatory fields that we expect always to be present @@ -155,8 +178,8 @@ def _parseData( self.last_updated = str_to_date(dc.data["updated_date"][0], self.tld) self.dnssec = dc.data["DNSSEC"] - self._doStatus(dc.data) - self._doNameservers(dc.data) + self._doStatus(pc, dc) + self._doNameservers(pc, dc) # optional fields self._doOptionalFields(dc.data) @@ -167,7 +190,6 @@ def __init__( dc: DataContext, ) -> None: pass - # self.init(pc=pc, dc=dc) def init( self, @@ -184,8 +206,8 @@ def init( if dc.data == {}: return - if pc.verbose: - print(dc.data, file=sys.stderr) + msg = f"{dc.data}" + log.debug(msg) k = "domain_name" if k in dc.data: @@ -198,7 +220,13 @@ def init( if pc.withPublicSuffix and dc.hasPublicSuffix: self.public_suffix: str = str(dc.publicSuffixStr) + if pc.extractServers: + self.servers = dc.servers + self.server = "" + if self.servers: + self.server = self.servers[-1] + if pc.return_raw_text_for_unsupported_tld is True: return - self._parseData(dc) + self._parseData(pc, dc) diff --git a/whois/exceptions.py b/whois/exceptions.py index d13f55e..9b7979e 100755 --- a/whois/exceptions.py +++ b/whois/exceptions.py @@ -1,3 +1,10 @@ +import os +import logging + +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + + class WhoisException(Exception): # make all other exeptions based on a generic exception pass diff --git a/whois/handleDateStrings.py b/whois/handleDateStrings.py index 71ed89d..21cbbb6 100644 --- a/whois/handleDateStrings.py +++ b/whois/handleDateStrings.py @@ -1,14 +1,24 @@ #! /usr/bin/env python3 -import re +""" +This module isolates all date parsing in one place +str_to_date() is the only entry point +""" +import re +import os +import logging import datetime -from .exceptions import UnknownDateFormat from typing import Optional +from .exceptions import UnknownDateFormat + + +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) # http://docs.python.org/library/datetime.html#strftime-strptime-behavior -DATE_FORMATS = [ +_DATE_FORMATS = [ "%d-%b-%Y", # 02-jan-2000 "%d-%m-%Y", # 02-01-2000 "%d.%m.%Y", # 02.02.2000 @@ -67,7 +77,7 @@ "%m-%d-%Y", # 03-28-2013 # is ambivalent for all days <=12 ] -CUSTOM_DATE_FORMATS = { +_CUSTOM_DATE_FORMATS = { "ml": "%m/%d/%Y", } @@ -113,17 +123,17 @@ def str_to_date( # 07 january 2020 at 23:38:30.772 # %d %B %Y at %H:%M %S.%f - if tld and tld in CUSTOM_DATE_FORMATS: + if tld and tld in _CUSTOM_DATE_FORMATS: return ( datetime.datetime.strptime( text, - CUSTOM_DATE_FORMATS[tld], + _CUSTOM_DATE_FORMATS[tld], ) .astimezone() .replace(tzinfo=None) ) - for f in DATE_FORMATS: + for f in _DATE_FORMATS: try: z = datetime.datetime.strptime(text, f) z = z.astimezone() diff --git a/whois/helpers.py b/whois/helpers.py index eaa0a0e..bf883fa 100644 --- a/whois/helpers.py +++ b/whois/helpers.py @@ -1,3 +1,6 @@ +import os +import logging + from typing import ( Optional, List, @@ -5,15 +8,16 @@ Any, ) -from .exceptions import ( - WhoisQuotaExceeded, -) +from .exceptions import WhoisQuotaExceeded from .tldInfo import TldInfo from .version import VERSION from .tldDb.tld_regexpr import ZZ from .context.parameterContext import ParameterContext +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + def filterTldToSupportedPattern( domain: str, @@ -25,9 +29,14 @@ def filterTldToSupportedPattern( def mergeExternalDictWithRegex( - aDict: Dict[str, Any] = {}, + aDict: Optional[Dict[str, Any]] = None, ) -> None: global tldInfo + if aDict is None: + return + if len(aDict) == 0: + return + tldInfo.mergeExternalDictWithRegex(aDict) diff --git a/whois/lastWhois.py b/whois/lastWhois.py index fd4cbb0..097c4eb 100644 --- a/whois/lastWhois.py +++ b/whois/lastWhois.py @@ -1,3 +1,15 @@ +""" +This module keeps track of the original whois string for the last query request + +it should be rewritten to use a static class or singleton +it is re-initialized on each new request + +public access is only needed fow: get_last_raw_whois_data() + +""" +import os +import logging + from typing import ( List, Dict, @@ -6,6 +18,9 @@ from .context.parameterContext import ParameterContext +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + LastWhois: Dict[str, Any] = {} diff --git a/whois/main.py b/whois/main.py index 6ebdcbc..0a5fddd 100755 --- a/whois/main.py +++ b/whois/main.py @@ -5,6 +5,7 @@ import getopt import sys import json +import logging from typing import ( Optional, @@ -14,8 +15,12 @@ Dict, ) +import whois + # import whoisdomain as whois # to be compatible with dannycork -import whois # to be compatible with dannycork + +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) # if we are not running as test2.py run in a simplistic way SIMPLISTIC: bool = False @@ -30,7 +35,10 @@ IgnoreReturncode: bool = False TestAllTld: bool = False TestRunOnly: bool = False + WithPublicSuffix: bool = False +WithExtractServers: bool = False +WithStripHttpStatus: bool = False class ResponseCleaner: @@ -139,16 +147,14 @@ def cleanupWhoisResponse( self.rDict["Preamble"].append(line) line = "PRE;" + line continue - else: - preambleSeen = True + preambleSeen = True if preambleSeen is True and percentSeen is False: if line.startswith("%"): self.rDict["Percent"].append(line) line = "PERCENT;" + line continue - else: - percentSeen = True + percentSeen = True if postambleSeen is False: if line.startswith("-- ") or line.startswith(">>> ") or line.startswith("Copyright notice"): @@ -183,7 +189,7 @@ def printMe(self) -> None: print(k, cr, tab, lines) k = "Body" - if len(self.rDict[k]): + if self.rDict[k]: n = 0 for lines in self.rDict[k]: ws = " [WHITESPACE AT END] " if re.search(r"[ \t]+\r?\n", lines) else "" @@ -212,12 +218,15 @@ def testItem( global IgnoreReturncode global Verbose global PrintGetRawWhoisResult - global SIMPLISTIC - global WithRedacted + global SIMPLISTIC global TestAllTld global TestRunOnly + + global WithRedacted global WithPublicSuffix + global WithExtractServers + global WithStripHttpStatus pc = whois.ParameterContext( ignore_returncode=IgnoreReturncode, @@ -227,6 +236,8 @@ def testItem( simplistic=SIMPLISTIC, withRedacted=WithRedacted, withPublicSuffix=WithPublicSuffix, + extractServers=WithExtractServers, + stripHttpStatus=WithStripHttpStatus, ) # use the new query (can also simply use q2() @@ -328,7 +339,7 @@ def getTestFileOne(fPath: str, fileData: Dict[str, Any]) -> None: fileData[bName] = [] xx = fileData[bName] - with open(fPath) as f: + with open(fPath, encoding="utf-8") as f: for index, line in enumerate(f): line = line.strip() if len(line) == 0 or line.startswith("#"): @@ -534,6 +545,8 @@ def main() -> None: global TestAllTld global TestRunOnly global WithPublicSuffix + global WithExtractServers + global WithStripHttpStatus name: str = os.path.basename(sys.argv[0]) if name == "test2.py": @@ -564,6 +577,8 @@ def main() -> None: "Cleanup=", "withRedacted", "withPublicSuffix", + "extractServers", + "stripHttpStatus", ], ) except getopt.GetoptError: @@ -600,12 +615,6 @@ def main() -> None: usage() sys.exit(0) - if opt in ("--withRedacted"): - WithRedacted = True - - if opt in ("--withPublicSuffix"): - WithPublicSuffix = True - if opt in ("-a", "--all"): TestAllTld = True @@ -619,6 +628,7 @@ def main() -> None: if opt in ("-v", "--verbose"): Verbose = True + logging.basicConfig(level="DEBUG") if opt in ("-p", "--print"): PrintGetRawWhoisResult = True @@ -632,7 +642,7 @@ def main() -> None: rr = makeTestAllCurrentTld(None) for item in sorted(rr): print(item) - exit(0) + sys.exit(0) if opt in ("-t", "--test"): # collect all _test entries defined and only run those, @@ -678,10 +688,22 @@ def main() -> None: if domain not in domains: domains.append(domain) - if Verbose: - print(f"{name} SIMPLISTIC: {SIMPLISTIC}", file=sys.stderr) + if opt in ("--extractServers"): + WithExtractServers = True + + if opt in ("--stripHttpStatus"): + WithStripHttpStatus = True + + if opt in ("--withRedacted"): + WithRedacted = True + + if opt in ("--withPublicSuffix"): + WithPublicSuffix = True + + msg = f"{name} SIMPLISTIC: {SIMPLISTIC}" + log.debug(msg) - if Ruleset is True and len(domains): + if Ruleset is True and domains: for domain in domains: ShowRuleset(domain) sys.exit(0) @@ -695,25 +717,25 @@ def main() -> None: showFailures() sys.exit(0) - if len(dirs): + if dirs: fileData = {} for dName in dirs: getTestFilesAll(dName, fileData) - for testFile in fileData: - testDomains(fileData[testFile]) + for testFile, x in fileData.items(): + testDomains(x) showFailures() sys.exit(0) - if len(files): + if files: fileData = {} for testFile in files: getTestFileOne(testFile, fileData) - for testFile in fileData: - testDomains(fileData[testFile]) + for testFile, x in fileData.items(): + testDomains(x) showFailures() sys.exit(0) - if len(domains): + if domains: testDomains(domains) showFailures() sys.exit(0) diff --git a/whois/processWhoisDomainRequest.py b/whois/processWhoisDomainRequest.py index 1af7dc0..198c78d 100644 --- a/whois/processWhoisDomainRequest.py +++ b/whois/processWhoisDomainRequest.py @@ -1,4 +1,6 @@ -import sys +# import sys +import os +import logging from typing import ( Optional, @@ -20,12 +22,15 @@ from .lastWhois import updateLastWhois from .whoisCliInterface import WhoisCliInterface +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + TLD_LIB_PRESENT: bool = False try: import tld as libTld TLD_LIB_PRESENT = True -except Exception as ee: +except ImportError as ee: _ = ee # ignore any error @@ -43,6 +48,8 @@ def __init__( self.dom: Optional[Domain] = dom self.wci = wci self.parser = parser + if self.pc.verbose: + logging.basicConfig(level="DEBUG") def _analyzeDomainStringAndValidate( self, @@ -64,8 +71,8 @@ def _internationalizedDomainNameToPunyCode(d: List[str]) -> List[str]: if res: self.dc.publicSuffixStr = str(res) self.dc.hasPublicSuffix = True - if self.pc.verbose: - print(f"publicSuffixStr: {self.dc.publicSuffixStr}", file=sys.stderr) + msg = f"publicSuffixStr: {self.dc.publicSuffixStr}" + log.debug(msg) if len(self.dc.dList) == 0: self.dc.tldString = None @@ -143,8 +150,8 @@ def _doUnsupportedTldAnyway( def _doOneLookup( self, ) -> Tuple[Optional[Domain], bool]: - if self.pc.verbose: - print(f"DEBUG: ### lookup: tldString: {self.dc.tldString}; dList: {self.dc.dList}", file=sys.stderr) + msg = f"### lookup: tldString: {self.dc.tldString}; dList: {self.dc.dList}" + log.debug(msg) if self.dc.dList is None: # mainly to please mypy self.dom = None @@ -171,8 +178,8 @@ def _doOneLookup( self.dc.whoisStr = str(self.dc.whoisStr) - if self.pc.verbose: - print("DEBUG: Raw: ", self.dc.whoisStr, file=sys.stderr) + msg = f"Raw: {self.dc.whoisStr}" + log.debug(msg) self.dc.rawWhoisStr = self.dc.whoisStr # keep the original whois string for reference before we clean updateLastWhois( @@ -183,21 +190,23 @@ def _doOneLookup( self.parser.init() # init also calls cleanup on the text string whois cli response - if self.pc.verbose: - print("DEBUG: Clean: ", self.dc.whoisStr, file=sys.stderr) + msg = f"Clean: {self.dc.whoisStr}" + log.debug(msg) assert self.dom is not None data, finished = self.parser.parse( dom=self.dom, ) - self.dom = data + if finished: + self.dom = data + return data, finished def _prepRequest(self) -> bool: try: self._analyzeDomainStringAndValidate() # may raise UnknownTld - except Exception as e: + except UnknownTld as e: if self.pc.simplistic is False: raise e @@ -221,7 +230,11 @@ def _prepRequest(self) -> bool: return True # ================================================= - if self.dc.tldString not in get_TLD_RE().keys(): + myKeys: List[str] = [] + for item in get_TLD_RE(): + myKeys.append(item) + + if self.dc.tldString not in myKeys: msg = self._makeMessageForUnsupportedTld() if msg is None: self._doUnsupportedTldAnyway() @@ -288,8 +301,11 @@ def processRequest(self) -> Optional[Domain]: tldLevel = str(self.dc.tldString).split(".") while len(self.dc.dList) > len(tldLevel): - self.dom, finished = self._doOneLookup() + log.debug(f"{self.dc.dList}") + z, finished = self._doOneLookup() + if finished: + self.dom = z return self.dom self.dc.dList = self.dc.dList[1:] # strip one element from the front and try again diff --git a/whois/strings/ignoreStrings.py b/whois/strings/ignoreStrings.py index 8eda720..81d3f7b 100644 --- a/whois/strings/ignoreStrings.py +++ b/whois/strings/ignoreStrings.py @@ -1,7 +1,13 @@ +import os +import logging + + from typing import ( List, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) IGNORESTRINGS: List[str] = [ "", diff --git a/whois/strings/noneStrings.py b/whois/strings/noneStrings.py index 9d0098d..3c0b81b 100644 --- a/whois/strings/noneStrings.py +++ b/whois/strings/noneStrings.py @@ -1,7 +1,13 @@ +import os +import logging + from typing import ( List, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + NONESTRINGS: List[str] = [ "the domain has not been registered", diff --git a/whois/strings/quotaStrings.py b/whois/strings/quotaStrings.py index c92bfe3..c21122e 100644 --- a/whois/strings/quotaStrings.py +++ b/whois/strings/quotaStrings.py @@ -1,7 +1,12 @@ +import os +import logging + from typing import ( List, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) QUOTASTRINGS: List[str] = [ "limit exceeded", diff --git a/whois/tldDb/finders.py b/whois/tldDb/finders.py index a0f2d66..07a0381 100644 --- a/whois/tldDb/finders.py +++ b/whois/tldDb/finders.py @@ -1,5 +1,8 @@ +import os import re -import sys +import logging + +# pylint: disable=unused-argument from typing import ( # Dict, @@ -8,6 +11,9 @@ Callable, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + def newLineSplit( ignoreCase: bool = True, @@ -18,7 +24,7 @@ def xNewlineSplit( ) -> List[str]: # split the incoming text on newlines \n\n what = r"\n\n" - return re.split(what, whoisStr, flags=re.IGNORECASE if ignoreCase else 0) # NOFLAG is 3.11 + return re.split(what, whoisStr, flags=re.IGNORECASE if ignoreCase else 0) # NOFLAG is 3.11 return xNewlineSplit @@ -33,7 +39,7 @@ def reFindAll( sData: List[str], verbose: bool = False, ) -> List[str]: - flags = re.IGNORECASE if ignoreCase else 0 # NOFLAG is 3.11 + flags = re.IGNORECASE if ignoreCase else 0 # NOFLAG is 3.11 return re.findall(reStr, textStr, flags=flags) return reFindAll @@ -60,18 +66,19 @@ def xFindFromToAndLookFor( sData: List[str], verbose: bool = False, ) -> List[str]: - flags = re.IGNORECASE if ignoreCase else 0 # NOFLAG is 3.11 + flags = re.IGNORECASE if ignoreCase else 0 # NOFLAG is 3.11 s1 = re.search(fromStr, textStr, flags=flags) - if verbose: - print(f"DEBUG s1 {s1}, {fromStr}", file=sys.stderr) + + msg = f"s1 {s1}, {fromStr}" + log.debug(msg) if s1 is None: return [] start = s1.start() t2 = textStr[start:] - if verbose: - print(f"DEBUG: fromStr {t2}", file=sys.stderr) + msg = f"fromStr {t2}" + log.debug(msg) s2 = re.search(toStr, t2, flags=flags) if s2 is None: @@ -79,14 +86,15 @@ def xFindFromToAndLookFor( end = s2.end() t3 = t2[:end] - if verbose: - print(f"DEBUG: toStr {t3}", file=sys.stderr) + msg = f"toStr {t3}" + log.debug(msg) return re.findall(lookForStr, t3, flags=flags) return xFindFromToAndLookFor +# pylint disable=pointless-string-statement r""" example look for in context: google.sk look for Organization:\s*([^\n]*)\n @@ -114,7 +122,7 @@ def xFindFromToAndLookFor( lookForStr=r"Organization:\s*([^\n]*)\n" ) test with: ./test2.py -v -d google.sk 2>2 -""" +""" # pylint disable=pointless-string-statement def findFromToAndLookForWithFindFirst( @@ -136,7 +144,7 @@ def xFindFromToAndLookForWithFindFirst( sData: List[str], verbose: bool = False, ) -> List[str]: - flags = re.IGNORECASE if ignoreCase else 0 # NOFLAG is 3.11 + flags = re.IGNORECASE if ignoreCase else 0 # NOFLAG is 3.11 ff = re.findall(findFirst, textStr, flags=flags) if ff is None or ff == []: @@ -146,22 +154,22 @@ def xFindFromToAndLookForWithFindFirst( if ff2 == "": return [] - if verbose: - print(f"DEBUG: we found: {ff2}, now combine with {fromStr}", file=sys.stderr) + msg = f"we found: {ff2}, now combine with {fromStr}" + log.debug(msg) fromStr2 = fromStr.replace(r"{}", ff2) s1 = re.search(fromStr2, textStr, flags=flags) - if verbose: - print(f"DEBUG s1 {s1}, {fromStr}", file=sys.stderr) + msg = f"s1 {s1}, {fromStr}" + log.debug(msg) if s1 is None: return [] start = s1.start() t2 = textStr[start:] - if verbose: - print(f"DEBUG: fromStr {t2}", file=sys.stderr) + msg = f"fromStr {t2}" + log.debug(msg) s2 = re.search(toStr, t2, flags=flags) if s2 is None: @@ -169,8 +177,8 @@ def xFindFromToAndLookForWithFindFirst( end = s2.end() t3 = t2[:end] - if verbose: - print(f"DEBUG: toStr {t3}", file=sys.stderr) + msg = f"toStr {t3}" + log.debug(msg) return re.findall(lookForStr, t3, flags=flags) @@ -197,7 +205,7 @@ def xfindInSplitedLookForHavingFindFirst( sData: List[str], verbose: bool = False, ) -> List[str]: - flags = re.IGNORECASE if ignoreCase else 0 # NOFLAG is 3.11 + flags = re.IGNORECASE if ignoreCase else 0 # NOFLAG is 3.11 ff = re.findall(findFirst, textStr, flags=flags) if ff is None or ff == []: diff --git a/whois/tldDb/groupers.py b/whois/tldDb/groupers.py index d6bbd8e..ce71529 100644 --- a/whois/tldDb/groupers.py +++ b/whois/tldDb/groupers.py @@ -1,5 +1,7 @@ -import re -import sys +# import re +# import sys +import os +import logging from typing import ( Dict, @@ -7,6 +9,10 @@ Callable, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + + COM_LIST: List[str] = [ r"\nRegistrar", r"\nRegistrant", @@ -17,7 +23,7 @@ ] -def groupFromList(list: List[str]) -> Callable[[str], Dict[str, str]]: +def groupFromList(aList: List[str]) -> Callable[[str], Dict[str, str]]: def xgroupFromList( whoisStr: str, verbose: bool = False, @@ -28,7 +34,7 @@ def xgroupFromList( # create a empty list # store the list under key # see if there is a match ans append matched lines to the list - what = r"\n\n" + # what = r"\n\n" return result return xgroupFromList diff --git a/whois/tldDb/tld_regexpr.py b/whois/tldDb/tld_regexpr.py index 778e216..c33814b 100644 --- a/whois/tldDb/tld_regexpr.py +++ b/whois/tldDb/tld_regexpr.py @@ -1,3 +1,7 @@ +# pylint: disable=C0302 +import os +import logging + from typing import ( Dict, Any, @@ -12,6 +16,9 @@ findInSplitedLookForHavingFindFirst, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + # 2023-09-03 mboot, all _items are inherited, confirmed # only _ as meta domains do net end up in the database @@ -1109,6 +1116,8 @@ def xStr(what: str, times: int = 1, firstMandatory: bool = True) -> str: "status": R(r"Domain Status:\s(.+)"), "name_servers": R(r"Name Server:\s+(.+)"), "registrant_country": None, + "_server": "whois.website.ws", + "_test": "website.ws", } ZZ["re"] = { @@ -1358,6 +1367,12 @@ def xStr(what: str, times: int = 1, firstMandatory: bool = True) -> str: "status": R(r"Status:\.+\s?(.+)"), "name_servers": R(r"Name Server \(DB\):\.+(.+)"), } +ZZ["au"] = { + "extend": "com", + "registrar": R(r"Registrar Name:\s?(.+)"), + "updated_date": R(r"Last Modified:([^\n]*)"), + "registrant": r"Registrant:\s*([^\n]*)\n", +} # ====================================== # ====================================== @@ -1432,7 +1447,6 @@ def xStr(what: str, times: int = 1, firstMandatory: bool = True) -> str: ZZ["audible"] = {"_server": "whois.nic.audible", "extend": "com", "_test": "nic.audible"} ZZ["audio"] = {"extend": "_uniregistry", "_server": "whois.uniregistry.net"} ZZ["audi"] = {"_server": "whois.afilias-srs.net", "extend": "com"} -ZZ["au"] = {"extend": "com", "registrar": R(r"Registrar Name:\s?(.+)"), "updated_date": R(r"Last Modified:([^\n]*)")} ZZ["auspost"] = {"_server": "whois.nic.auspost", "extend": "com", "_test": "nic.auspost"} ZZ["author"] = {"_server": "whois.nic.author", "extend": "com", "_test": "nic.author"} ZZ["auto"] = {"extend": "_centralnic", "_server": "whois.centralnic.com"} @@ -1536,12 +1550,7 @@ def xStr(what: str, times: int = 1, firstMandatory: bool = True) -> str: ZZ["ca.ug"] = {"extend": "ug"} ZZ["cba"] = {"_server": "whois.nic.cba", "extend": "com", "_test": "nic.cba"} ZZ["cbs"] = {"_server": "whois.afilias-srs.net", "extend": "com"} -ZZ["cd"] = { - "extend": "ac", - "_server": "whois.nic.cd", - "registrant_country": R(r"Registrant\s+Country:\s+(.+)"), - "_test": "nic.cd", -} +ZZ["cd"] = {"extend": "ac", "_server": "whois.nic.cd", "registrant_country": R(r"Registrant\s+Country:\s+(.+)"), "_test": "nic.cd"} ZZ["center"] = {"extend": "_donuts", "_server": "whois.donuts.co"} ZZ["ceo"] = {"extend": "_centralnic", "_server": "whois.centralnic.com"} ZZ["cern"] = {"_server": "whois.afilias-srs.net", "extend": "com"} @@ -1679,11 +1688,7 @@ def xStr(what: str, times: int = 1, firstMandatory: bool = True) -> str: ZZ["dog"] = {"extend": "_donuts", "_server": "whois.donuts.co"} ZZ["domains"] = {"extend": "_donuts", "_server": "whois.donuts.co"} ZZ["dot"] = {"_server": "whois.nic.dot", "extend": "com", "_test": "nic.dot"} -ZZ["download"] = { - "extend": "amsterdam", - "name_servers": R(r"Name Server:[ \t]+(\S+)"), - "status": R(r"Domain Status:\s*([a-zA-z]+)"), -} +ZZ["download"] = {"extend": "amsterdam", "name_servers": R(r"Name Server:[ \t]+(\S+)"), "status": R(r"Domain Status:\s*([a-zA-z]+)")} ZZ["drive"] = {"_server": "whois.nic.google", "extend": "com"} ZZ["dtv"] = {"_server": "whois.nic.dtv", "extend": "com", "_test": "nic.dtv"} ZZ["dubai"] = {"_server": "whois.nic.dubai", "extend": "com", "_test": "nic.dubai"} @@ -1794,13 +1799,7 @@ def xStr(what: str, times: int = 1, firstMandatory: bool = True) -> str: ZZ["genting"] = {"_server": "whois.nic.genting", "extend": "com", "_test": "nic.genting"} ZZ["geo.jp"] = {"extend": "co.jp"} ZZ["george"] = {"_server": "whois.nic.george", "extend": "com", "_test": "nic.george"} -ZZ["ge"] = { - "_server": "whois.nic.ge", - "extend": "ac", - "updated_date": None, - "_test": "nic.ge", - "registrant": R(r"Registrant:\s*([^\n]*)\n"), -} +ZZ["ge"] = {"_server": "whois.nic.ge", "extend": "ac", "updated_date": None, "_test": "nic.ge", "registrant": R(r"Registrant:\s*([^\n]*)\n")} ZZ["gf"] = {"extend": "si", "_server": "whois.mediaserv.net"} ZZ["ggee"] = {"_server": "whois.nic.ggee", "extend": "com", "_test": "nic.ggee"} ZZ["gh"] = {"_privateRegistry": True} @@ -1945,11 +1944,7 @@ def xStr(what: str, times: int = 1, firstMandatory: bool = True) -> str: ZZ["kfh"] = {"_server": "whois.nic.kfh", "extend": "com", "_test": "nic.kfh"} ZZ["kia"] = {"_server": "whois.nic.kia", "extend": "com", "_test": "nic.kia"} ZZ["kids"] = {"_server": "whois.afilias-srs.net", "extend": "com"} -ZZ["ki"] = { - "extend": "com", - "_server": "whois.nic.ki", - "_test": None, -} # kiribati never answeres, timout is the normal response +ZZ["ki"] = {"extend": "com", "_server": "whois.nic.ki", "_test": None} # kiribati never answeres, timout is the normal response ZZ["kim"] = {"_server": "whois.nic.kim", "extend": "com", "_test": "nic.kim"} ZZ["kindle"] = {"_server": "whois.nic.kindle", "extend": "com", "_test": "nic.kindle"} ZZ["kitchen"] = {"extend": "_donuts", "_server": "whois.donuts.co"} @@ -2019,12 +2014,7 @@ def xStr(what: str, times: int = 1, firstMandatory: bool = True) -> str: ZZ["luxe"] = {"_server": "whois.nic.luxe", "extend": "com", "_test": "nic.luxe"} ZZ["luxury"] = {"extend": "_centralnic", "_server": "whois.centralnic.com"} ZZ["lviv.ua"] = {"extend": "com"} -ZZ["ly"] = { - "extend": "ac", - "_server": "whois.nic.ly", - "registrant_country": R(r"Registrant\s+Country:\s+(.+)"), - "_test": "nic.ly", -} +ZZ["ly"] = {"extend": "ac", "_server": "whois.nic.ly", "registrant_country": R(r"Registrant\s+Country:\s+(.+)"), "_test": "nic.ly"} ZZ["madrid"] = {"_server": "whois.nic.madrid", "extend": "com", "_test": "nic.madrid"} ZZ["ma"] = {"extend": "ac", "_server": "whois.registre.ma", "registrar": R(r"Sponsoring Registrar:\s*(.+)")} ZZ["maison"] = {"extend": "_donuts", "_server": "whois.donuts.co"} @@ -2058,11 +2048,7 @@ def xStr(what: str, times: int = 1, firstMandatory: bool = True) -> str: ZZ["mma"] = {"_server": "whois.nic.mma", "extend": "com", "_test": "nic.mma"} ZZ["mn"] = {"extend": "com"} ZZ["mn"] = {"extend": "com", "_server": "whois.nic.mn", "_test": "nic.mn"} -ZZ["mobi"] = { - "extend": "com", - "expiration_date": R(r"\nRegistry Expiry Date:\s?(.+)"), - "updated_date": R(r"\nUpdated Date:\s?(.+)"), -} +ZZ["mobi"] = {"extend": "com", "expiration_date": R(r"\nRegistry Expiry Date:\s?(.+)"), "updated_date": R(r"\nUpdated Date:\s?(.+)")} ZZ["mobi.ke"] = {"extend": "ke"} ZZ["mobile"] = {"_server": "whois.nic.mobile", "extend": "com", "_test": "nic.mobile"} ZZ["moda"] = {"extend": "_donuts", "_server": "whois.donuts.co"} @@ -2394,12 +2380,7 @@ def xStr(what: str, times: int = 1, firstMandatory: bool = True) -> str: ZZ["taxi"] = {"extend": "_donuts", "_server": "whois.donuts.co"} ZZ["tci"] = {"_server": "whois.nic.tci", "extend": "com", "_test": "nic.tci"} ZZ["tdk"] = {"_server": "whois.nic.tdk", "extend": "com", "_test": "nic.tdk"} -ZZ["td"] = { - "_server": "whois.nic.td", - "extend": "ac", - "registrant_country": R(r"Registrant Country:\s+(.+)"), - "_test": "nic.td", -} +ZZ["td"] = {"_server": "whois.nic.td", "extend": "ac", "registrant_country": R(r"Registrant Country:\s+(.+)"), "_test": "nic.td"} ZZ["team"] = {"extend": "_donuts", "_server": "whois.donuts.co"} ZZ["tech"] = {"extend": "_centralnic", "_server": "whois.centralnic.com"} ZZ["technology"] = {"extend": "_donuts", "_server": "whois.donuts.co"} @@ -2618,23 +2599,11 @@ def xStr(what: str, times: int = 1, firstMandatory: bool = True) -> str: ZZ["xn--tckwe"] = {"_server": "whois.nic.xn--tckwe", "extend": "com", "_test": "nic.xn--tckwe"} ZZ["xn--tiq49xqyj"] = {"_server": "whois.nic.xn--tiq49xqyj", "extend": "com", "_test": "nic.xn--tiq49xqyj"} ZZ["xn--unup4y"] = {"_server": "whois.nic.xn--unup4y", "extend": "com", "_test": "nic.xn--unup4y"} -ZZ["xn--vermgensberater-ctb"] = { - "_server": "whois.nic.xn--vermgensberater-ctb", - "extend": "com", - "_test": "nic.xn--vermgensberater-ctb", -} -ZZ["xn--vermgensberatung-pwb"] = { - "_server": "whois.nic.xn--vermgensberatung-pwb", - "extend": "com", - "_test": "nic.xn--vermgensberatung-pwb", -} +ZZ["xn--vermgensberater-ctb"] = {"_server": "whois.nic.xn--vermgensberater-ctb", "extend": "com", "_test": "nic.xn--vermgensberater-ctb"} +ZZ["xn--vermgensberatung-pwb"] = {"_server": "whois.nic.xn--vermgensberatung-pwb", "extend": "com", "_test": "nic.xn--vermgensberatung-pwb"} ZZ["xn--vhquv"] = {"_server": "whois.nic.xn--vhquv", "extend": "com", "_test": "nic.xn--vhquv"} ZZ["xn--vuq861b"] = {"extend": "_teleinfo", "_server": "whois.teleinfo.cn"} -ZZ["xn--w4r85el8fhu5dnra"] = { - "_server": "whois.nic.xn--w4r85el8fhu5dnra", - "extend": "com", - "_test": "nic.xn--w4r85el8fhu5dnra", -} +ZZ["xn--w4r85el8fhu5dnra"] = {"_server": "whois.nic.xn--w4r85el8fhu5dnra", "extend": "com", "_test": "nic.xn--w4r85el8fhu5dnra"} ZZ["xn--w4rs40l"] = {"_server": "whois.nic.xn--w4rs40l", "extend": "com", "_test": "nic.xn--w4rs40l"} ZZ["xn--wgbl6a"] = {"_server": "whois.registry.qa", "extend": "qa", "_test": "registry.qa"} ZZ["xn--xhq521b"] = {"_server": "whois.ngtld.cn", "extend": "com"} @@ -2948,6 +2917,8 @@ def xStr(what: str, times: int = 1, firstMandatory: bool = True) -> str: ZZ["yandex"] = {"_privateRegistry": True} # no whois server found in iana ZZ["zero"] = {"_privateRegistry": True} # no whois server found in iana +ZZ["onion"] = {"_privateRegistry": True} # this is a special case https://tools.ietf.org/html/rfc7686 + # unknown tld abb, abb, abb, abb, whois.nic.abb, # unknown tld arpa, arpa, arpa, arpa, whois.iana.org, # unknown tld bn, bn, bn, bn, whois.bnnic.bn, diff --git a/whois/tldInfo.py b/whois/tldInfo.py index 4864f67..95148d2 100644 --- a/whois/tldInfo.py +++ b/whois/tldInfo.py @@ -1,4 +1,6 @@ # import re +import os +import logging from typing import ( Dict, @@ -7,6 +9,9 @@ Optional, ) +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + class TldInfo: def __init__( @@ -15,6 +20,8 @@ def __init__( verbose: bool = False, ) -> None: self.verbose = verbose + if verbose: + logging.basicConfig(level="DEBUG") # a reference to the external ZZ database of all TLD info self.zzDictRef = zzDict @@ -111,8 +118,14 @@ def filterTldToSupportedPattern( def mergeExternalDictWithRegex( self, - aDict: Dict[str, Any] = {}, + aDict: Optional[Dict[str, Any]] = None, ) -> None: + if aDict is None: + return + + if len(aDict) == 0: + return + # merge in ZZ, this extends ZZ with new tld's and overrides existing tld's for tld in aDict.keys(): self.zzDictRef[tld] = aDict[tld] diff --git a/whois/version.py b/whois/version.py index 62d5cc3..5edf3ef 100755 --- a/whois/version.py +++ b/whois/version.py @@ -1 +1,2 @@ -VERSION = "1.20230913.1" +"""This module only makes the version available for dynamic versioning""" +VERSION = "1.20230917.1" diff --git a/whois/whoisCliInterface.py b/whois/whoisCliInterface.py index 4ed343d..a174ead 100755 --- a/whois/whoisCliInterface.py +++ b/whois/whoisCliInterface.py @@ -2,23 +2,28 @@ import subprocess import time -import sys + +# import sys import os import platform import shutil +import logging + +from typing import ( + List, +) from .exceptions import ( WhoisCommandFailed, WhoisCommandTimeout, ) -from typing import ( - List, -) - from .context.parameterContext import ParameterContext from .context.dataContext import DataContext +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + class WhoisCliInterface: def _specificOnNonWindowsPlatforms(self) -> None: @@ -42,8 +47,8 @@ def _tryInstallMissingWhoisOnWindows(self) -> None: """ folder = os.getcwd() copy_command = r"copy \\live.sysinternals.com\tools\whois.exe " + folder - if self.pc.verbose: - print("DEBUG: downloading dependencies: {copy_command}", file=sys.stderr) + msg = "downloading dependencies: {copy_command}" + log.debug(msg) subprocess.call( copy_command, @@ -90,13 +95,17 @@ def _makeWhoisCommandToRun(self) -> List[str]: whoisCommandList=whoisCommandList, ) + if self.pc.extractServers: + whoisCommandList = whoisCommandList + ["--verbose"] + if self.pc.server: - return whoisCommandList + [self.domain, "-h", self.pc.server] + whoisCommandList = whoisCommandList + ["-h", self.pc.server] + return whoisCommandList + [self.domain] def _postProcessingResult(self) -> str: - if self.pc.verbose: - print(f"DEBUG: {self.rawWhoisResultString}", file=sys.stderr) + msg = f"{self.rawWhoisResultString}" + log.debug(msg) if self.pc.ignore_returncode is False and self.processHandle.returncode not in [0, 1]: if "fgets: Connection reset by peer" in self.rawWhoisResultString: @@ -123,14 +132,14 @@ def _runWhoisCliOnThisOs(self) -> str: env={"LANG": "en"} if self.domain.endswith(".jp") else None, ) as self.processHandle: - if self.pc.verbose: - print(f"DEBUG: timout: {self.pc.timeout}", file=sys.stderr) + msg = f"timout: {self.pc.timeout}" + log.debug(msg) try: self.rawWhoisResultString = self.processHandle.communicate(timeout=self.pc.timeout,)[ 0 ].decode(errors="ignore") - except subprocess.TimeoutExpired: + except subprocess.TimeoutExpired as ex: # Kill the child process & flush any output buffers self.processHandle.kill() self.rawWhoisResultString = self.processHandle.communicate()[0].decode(errors="ignore") @@ -139,7 +148,7 @@ def _runWhoisCliOnThisOs(self) -> str: # Add this option to cover those cases if not self.pc.parse_partial_response or not self.rawWhoisResultString: msg = f"timeout: query took more then {self.pc.timeout} seconds" - raise WhoisCommandTimeout(msg) + raise WhoisCommandTimeout(msg) from ex return self._postProcessingResult() diff --git a/whois/whoisParser.py b/whois/whoisParser.py index fddfcdf..c7d3eeb 100755 --- a/whois/whoisParser.py +++ b/whois/whoisParser.py @@ -1,4 +1,5 @@ #! /usr/bin/env python3 +# pylint: disable=duplicate-code from typing import ( Any, @@ -11,7 +12,10 @@ ) import re -import sys +import os +import logging + +# import sys from .exceptions import ( FailedParsingWhoisOutput, @@ -29,6 +33,10 @@ from .helpers import get_TLD_RE +log = logging.getLogger(__name__) +logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) + + class WhoisParser: def __init__( self, @@ -38,6 +46,9 @@ def __init__( self.pc = pc self.dc = dc self.dom: Optional[Domain] = None + self.resultDict: Dict[str, Any] = {} + if self.pc.verbose: + logging.basicConfig(level="DEBUG") def _doExtractPattensIanaFromWhoisString( self, @@ -55,8 +66,8 @@ def _doExtractPattensIanaFromWhoisString( continue self.resultDict[key] = result - if self.pc.verbose: - print(f"DEBUG: parsing iana data only for tld: {self.dc.tldString}, {result}", file=sys.stderr) + msg = f"parsing iana data only for tld: {self.dc.tldString}, {result}" + log.debug(msg) def _doExtractPattensFromWhoisString_old( self, @@ -72,8 +83,8 @@ def _doExtractPattensFromWhoisString_old( if compiledRe: # here we apply the regex patterns self.resultDict[key] = compiledRe.findall(self.dc.whoisStr) or empty - if self.pc.verbose: - print(f"{key}, {self.resultDict[key]}", file=sys.stderr) + msg = f"{key}, {self.resultDict[key]}" + log.debug(msg) def _doExtractPattensFromWhoisString( self, @@ -84,9 +95,10 @@ def _doExtractPattensFromWhoisString( splitter = self.dc.thisTld.get("_split") if splitter: sData = splitter(self.dc.whoisStr, self.pc.verbose) - if self.pc.verbose and sData != []: + if sData != []: for item in sData: - print("DEBUG: split data", item, file=sys.stderr) + msg = f"split data: {item}" + log.debug(msg) for key, val in self.dc.thisTld.items(): if key.startswith("_"): @@ -100,22 +112,19 @@ def _doExtractPattensFromWhoisString( if callable(val): # vcall the curry function we created in tld_regexpr.py self.resultDict[key] = val(self.dc.whoisStr, sData, self.pc.verbose) or empty - if self.pc.verbose: - print( - f"DEBUG: _doExtractPattensFromWhoisString: call indirect {val} {key}, {self.resultDict[key]}", - file=sys.stderr, - ) + msg = f"_doExtractPattensFromWhoisString: call indirect {val} {key}, {self.resultDict[key]}" + log.debug(msg) continue if isinstance(val, str): # we still support plain strings also self.resultDict[key] = re.findall(val, self.dc.whoisStr, flags=re.IGNORECASE) or empty - if self.pc.verbose: - print(f"DEBUG _doExtractPattensFromWhoisStringstr: {key}, {self.resultDict[key]}", file=sys.stderr) + msg = f"_doExtractPattensFromWhoisStringstr: {key}, {self.resultDict[key]}" + log.debug(msg) continue - if self.pc.verbose: - print(f"DEBUG: UNKNOWN: _doExtractPattensFromWhoisString {key}, {val}", file=sys.stderr) + msg = f"UNKNOWN: _doExtractPattensFromWhoisString {key}, {val}" + log.debug(msg) def _doSourceIana( self, @@ -125,9 +134,8 @@ def _doSourceIana( # here we can handle the example.com and example.net permanent IANA domains k: str = "source: IANA" - if self.pc.verbose: - msg: str = f"DEBUG: i have seen {k}" - print(msg, file=sys.stderr) + msg: str = f"i have seen {k}" + log.debug(msg) whois_splitted: List[str] = self.dc.whoisStr.split(k) z: int = len(whois_splitted) @@ -138,9 +146,8 @@ def _doSourceIana( if z == 2 and whois_splitted[1].strip() != "": # if we see source: IANA and the part after is not only whitespace - if self.pc.verbose: - msg = f"DEBUG: after: {k} we see not only whitespace: {whois_splitted[1]}" - print(msg, file=sys.stderr) + msg = f"after: {k} we see not only whitespace: {whois_splitted[1]}" + log.debug(msg) self.dc.whoisStr = whois_splitted[1] self.dom = None @@ -169,9 +176,8 @@ def _doIfServerNameLookForDomainName(self) -> None: if not re.findall(r"Server Name:\s?(.+)", self.dc.whoisStr, re.IGNORECASE): return - if self.pc.verbose: - msg = "DEBUG: i have seen Server Name:, looking for Domain Name:" - print(msg, file=sys.stderr) + msg = "i have seen Server Name:, looking for Domain Name:" + log.debug(msg) # this changes the whoisStr, we may want to keep the original as extra self.dc.whoisStr = self.dc.whoisStr[self.dc.whoisStr.find("Domain Name:") :] @@ -184,9 +190,8 @@ def _doDnsSec( whoisDnsSecList: List[str] = self.dc.whoisStr.split("DNSSEC:") if len(whoisDnsSecList) >= 2: - if self.pc.verbose: - msg = "DEGUG: i have seen dnssec: {whoisDnsSecStr}" - print(msg, file=sys.stderr) + msg = "DEGUG: i have seen dnssec: {whoisDnsSecStr}" + log.debug(msg) whoisDnsSecStr: str = whoisDnsSecList[1].split("\n")[0] if whoisDnsSecStr.strip() == "signedDelegation" or whoisDnsSecStr.strip() == "yes": @@ -201,8 +206,8 @@ def _handleShortResponse( self.dom = None return self.dom - if self.pc.verbose: - print(f"DEBUG: shortResponse:: {self.dc.tldString} {self.dc.whoisStr}", file=sys.stderr) + msg = f"shortResponse:: {self.dc.tldString} {self.dc.whoisStr}" + log.debug(msg) # TODO: some short responses are actually valid: # lookfor Domain: and Status but all other fields are missing so the regexec could fail @@ -226,8 +231,8 @@ def _handleShortResponse( # --------------------------------- # is there any error string in the result if s.count("error"): - if self.pc.verbose: - print("DEBUG: i see 'error' in the result, return: None", file=sys.stderr) + msg = "i see 'error' in the result, return: None" + log.debug(msg) self.dom = None return self.dom @@ -261,6 +266,18 @@ def _handleShortResponse( raise FailedParsingWhoisOutput(self.dc.whoisStr) + def _extractWhoisServer(self) -> List[str]: + # jp starts comments with [\s + result = re.findall(r"^Using\s+server\s+([^\n]*)\n", str(self.dc.whoisStr), flags=re.IGNORECASE) + if result: + return result + + result = re.findall(r"\[(?=[^\s])([^\]]*)\]\r?\n", str(self.dc.whoisStr), flags=re.IGNORECASE) + if result: + return result + + return [] + def _cleanupWhoisResponse( self, ) -> str: @@ -333,25 +350,27 @@ def doSlowdownHintForThisTld( if self.pc.slow_down == 0 and int(slowDown) > 0: self.pc.slow_down = int(slowDown) - if self.pc.verbose and int(self.pc.slow_down): - print(f"DEBUG: using _slowdown hint {self.pc.slow_down} for tld: {self.dc.tldString}", file=sys.stderr) + if int(self.pc.slow_down): + msg = f"using _slowdown hint {self.pc.slow_down} for tld: {self.dc.tldString}" + log.debug(msg) return int(self.pc.slow_down) def getThisTld(self, tldString: str) -> None: self.dc.thisTld = get_TLD_RE().get(tldString, {}) - if self.pc.verbose: - print(self.dc.thisTld, file=sys.stderr) + msg = f"{self.dc.thisTld}" + log.debug(msg) def init( self, ) -> None: self.dc.whoisStr = str(self.dc.whoisStr) - self.resultDict: Dict[str, Any] = { + self.resultDict = { "tld": str(self.dc.tldString), "DNSSEC": self._doDnsSec(), } + self.dc.servers = self._extractWhoisServer() self._cleanupWhoisResponse() def parse(