Skip to content

Commit

Permalink
Added logfile and retry delay options
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Begtin committed Nov 1, 2021
1 parent 5250a8e commit 940c47b
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 56 deletions.
10 changes: 10 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@
History
=======

1.0.6 (2021-11-1)
-----------------
* Added "default_delay", 'retry_delay' and "retry_count" to manage error handling
* If get HTTP status 500 or 503 starts retrying latest request till HTTP status 200 or retry_count ends

1.0.5 (2021-05-31)
------------------
* Minor fixes

1.0.4 (2021-05-31)
------------------
* Added "start_page" in case if start_page is not 1 (could be 0 sometimes)
* Added support of data returned as JSON array, not JSON dict and data_key not provided
* Added initial code to implement Frictionless Data packaging
Expand Down
2 changes: 1 addition & 1 deletion apibackuper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
"""

__version__ = '1.0.5'
__version__ = '1.0.6'
__author__ = 'Ivan Begtin'
__licence__ = 'MIT'
138 changes: 86 additions & 52 deletions apibackuper/cmds/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import logging
import os
import csv
import urllib.request
import shutil
import time
from timeit import default_timer as timer
from zipfile import ZipFile, ZIP_DEFLATED
Expand All @@ -19,14 +17,9 @@
pass

from ..common import get_dict_value, set_dict_value, update_dict_values
from ..constants import DEFAULT_DELAY, FIELD_SPLITTER
from ..constants import DEFAULT_DELAY, FIELD_SPLITTER, DEFAULT_RETRY_COUNT, DEFAULT_TIMEOUT, PARAM_SPLITTER, FILE_SIZE_DOWNLOAD_LIMIT, DEFAULT_ERROR_STATUS_CODES, RETRY_DELAY
from ..storage import FilesystemStorage, ZipFileStorage


FILE_SIZE_DOWNLOAD_LIMIT = 270000000
DEFAULT_TIMEOUT = 10
PARAM_SPLITTER = ';'

def load_file_list(filename, encoding='utf8'):
"""Reads file and returns list of strings as list"""
flist = []
Expand Down Expand Up @@ -69,7 +62,21 @@ def __init__(self, project_path=None):
self.project_path = os.getcwd() if project_path is None else project_path
self.config_filename = os.path.join(self.project_path, 'apibackuper.cfg')
self.__read_config(self.config_filename)
pass
self.enable_logging()


def enable_logging(self):
"""Enable logging to file and StdErr"""
logFormatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
rootLogger = logging.getLogger()

fileHandler = logging.FileHandler("{0}".format(self.logfile))
fileHandler.setFormatter(logFormatter)
rootLogger.addHandler(fileHandler)

consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)

def __read_config(self, filename):
self.config = None
Expand All @@ -84,6 +91,7 @@ def __read_config(self, filename):
'splitter') else FIELD_SPLITTER
self.id = conf.get('settings', 'id') if conf.has_option('settings', 'id') else None
self.name = conf.get('settings', 'name')
self.logfile = conf.get('settings', 'logfile') if conf.has_option('settings', 'logfile') else "apibackuper.log"
self.data_key = conf.get('data', 'data_key')
self.storage_type = conf.get('storage', 'storage_type')
self.http_mode = conf.get('project', 'http_mode')
Expand All @@ -92,6 +100,12 @@ def __read_config(self, filename):
self.page_limit = conf.getint('params', 'page_size_limit')
self.resp_type = conf.get('project', 'resp_type') if conf.has_option('project', 'resp_type') else 'json'
self.iterate_by = conf.get('project', 'iterate_by') if conf.has_option('project', 'iterate_by') else 'page'
self.default_delay = conf.getint('project', 'default_delay') if conf.has_option('project', 'default_delay') else DEFAULT_DELAY
self.retry_delay = conf.getint('project', 'retry_delay') if conf.has_option('project',
'retry_delay') else RETRY_DELAY
self.force_retry = conf.getboolean('project', 'force_retry') if conf.has_option('project', 'force_retry') else False
self.retry_count = conf.getint('project', 'retry_count') if conf.has_option('project',
'retry_count') else DEFAULT_RETRY_COUNT

self.start_page = conf.getint('params', 'start_page') if conf.has_option('params', 'start_page') else 1
self.query_mode = conf.get('params', 'query_mode') if conf.has_option('params', 'query_mode') else "query"
Expand Down Expand Up @@ -216,6 +230,31 @@ def export(self, format, filename):
logging.info('Data exported to %s' % (filename))


def _single_request(self, url, headers, params, flatten=None):
if self.http_mode == 'GET':
if self.flat_params and len(params.keys()) > 0:
s = []
for k, v in flatten.items():
s.append('%s=%s' % (k, v.replace("'", '"').replace('True', 'true')))
logging.info('url: %s' % (url + '?' + '&'.join(s)))
if headers:
response = self.http.get(url + '?' + '&'.join(s), headers=headers)
else:
response = self.http.get(url + '?' + '&'.join(s))
else:
logging.info('url: %s, params: %s' % (url, str(params)))
if headers:
response = self.http.get(url, params=params, headers=headers)
else:
response = self.http.get(url, params=params)
else:
logging.debug('Request %s, params %s, headers %s' % (url, str(params), str(headers)))
if headers:
response = self.http.post(url, json=params, headers=headers)
else:
response = self.http.post(url, json=params)
return response

def run(self, mode):
if self.config is None:
print('Config file not found. Please run in project directory')
Expand Down Expand Up @@ -254,6 +293,8 @@ def run(self, mode):
flatten = {}
for k, v in params.items():
flatten[k] = str(v)
else:
flatten = None

url_params = None
params_file = os.path.join(self.project_path, 'url_params.json')
Expand Down Expand Up @@ -289,7 +330,7 @@ def run(self, mode):
# if flat_params and len(params.keys()) > 0:
# response = self.http.post(url, json=flatten)
# else:
logging.info('Request %s, params %s, headers %s' % (url, str(params), str(headers)))
logging.debug('Request %s, params %s, headers %s' % (url, str(params), str(headers)))
if headers:
response = self.http.post(url, json=params, headers=headers, verify=False)
else:
Expand Down Expand Up @@ -330,6 +371,7 @@ def run(self, mode):
if self.query_mode in ['params', 'mixed']:
url_params.update(change_params)
else:
# print(params, change_params)
params = update_dict_values(params, change_params)
if self.flat_params and len(params.keys()) > 0:
for k, v in params.items():
Expand All @@ -340,41 +382,30 @@ def run(self, mode):
url = _url_replacer(self.start_url, url_params, query_mode=True)
else:
url = self.start_url
if self.http_mode == 'GET':
if self.flat_params and len(params.keys()) > 0:
s = []
for k, v in flatten.items():
s.append('%s=%s' % (k, v.replace("'", '"').replace('True', 'true')))
logging.info('url: %s' % (url + '?' + '&'.join(s)))
if headers:
response = self.http.get(url + '?' + '&'.join(s), headers=headers)
else:
response = self.http.get(url + '?' + '&'.join(s))
else:
logging.info('url: %s, params: %s' % (url, str(params)))
if headers:
response = self.http.get(url, params=params, headers=headers)
else:
response = self.http.get(url, params=params)
else:
# if flat_params and len(params.keys()) > 0:
# response = self.http.post(url, json=flatten)
# else:
logging.info('Request %s, params %s, headers %s' % (url, str(params), str(headers)))
if headers:
response = self.http.post(url, json=params, headers=headers)
response = self._single_request(url, headers, params, flatten)
time.sleep(self.default_delay)
if response.status_code in DEFAULT_ERROR_STATUS_CODES:
rc = 0
for rc in range(1, self.retry_count, 1):
logging.info('Retry attempt %d of %d, delay %d' % (rc, self.retry_count, self.retry_delay))
time.sleep(self.retry_delay)
response = self._single_request(url, headers, params, flatten)
if response.status_code not in DEFAULT_ERROR_STATUS_CODES:
logging.info('Looks like finally we have proper response on %d attempt' %(rc))
break
if response.status_code not in DEFAULT_ERROR_STATUS_CODES:
if num_pages is not None:
logging.info('Saving page %d of %d' % (page, num_pages))
else:
response = self.http.post(url, json=params)
if num_pages is not None:
logging.info('Saving page %d of %d' % (page, num_pages))
logging.info('Saving page %d' % (page))
if self.resp_type == 'json':
outdata = response.content
elif self.resp_type == 'xml':
outdata = json.dump(xmltodict.parse(response.content))
mzip.writestr('page_%d.json' % (page), outdata)
else:
logging.info('Saving page %d' % (page))
if self.resp_type == 'json':
outdata = response.content
elif self.resp_type == 'xml':
outdata = json.dump(xmltodict.parse(response.content))
mzip.writestr('page_%d.json' % (page), outdata)
time.sleep(DEFAULT_DELAY)
logging.info('Errors persist on page %d. Stopped' % (page))
break
mzip.close()

# pass
Expand Down Expand Up @@ -537,7 +568,7 @@ def follow(self, mode='item'):
for key in finallist:
n += 1
url = self.follow_pattern + str(key)
print(url)
# print(url)
response = self.http.get(url)
logging.info('Saving object with id %s. %d of %d' % (key, n, total))
mzip.writestr('%s.json' % (key), response.content)
Expand All @@ -560,7 +591,7 @@ def getfiles(self, be_careful=False):
if not os.path.exists(storage_file):
print('Storage file not found')
return
uniq_ids = []
uniq_ids = set()

allfiles_name = os.path.join(self.storagedir, 'allfiles.csv')
if not os.path.exists(allfiles_name):
Expand All @@ -571,7 +602,7 @@ def getfiles(self, be_careful=False):
for fname in mzip.namelist():
n += 1
if n % 10 == 0:
logging.info('Processed %d files' % (n))
logging.info('Processed %d files, uniq ids %d' % (n, len(uniq_ids)))
tf = mzip.open(fname, 'r')
data = json.load(tf)
tf.close()
Expand All @@ -587,7 +618,10 @@ def getfiles(self, be_careful=False):
if file_data:
for uniq_id in file_data:
if uniq_id is not None:
uniq_ids.append(uniq_id)
if isinstance(uniq_id, list):
uniq_ids.update(set(uniq_id))
else:
uniq_ids.add(uniq_id)
except KeyError:
logging.info('Data key: %s not found' % (str(self.data_key)))
else:
Expand Down Expand Up @@ -619,7 +653,7 @@ def getfiles(self, be_careful=False):
logging.info('Storing all filenames')
f = open(allfiles_name, 'w', encoding='utf8')
for u in uniq_ids:
f.write(u + '\n')
f.write(str(u) + '\n')
f.close()
else:
logging.info('Load all filenames')
Expand Down Expand Up @@ -666,7 +700,7 @@ def getfiles(self, be_careful=False):
n = 0
for uniq_id in uniq_ids:
if self.fetch_mode == 'prefix':
url = self.root_url + uniq_id
url = self.root_url + str(uniq_id)
elif self.fetch_mode == 'pattern':
url = self.root_url.format(uniq_id)
n += 1
Expand Down Expand Up @@ -696,9 +730,9 @@ def getfiles(self, be_careful=False):
continue
else:
if self.default_ext is not None:
filename = uniq_id + '.' + self.default_ext
filename = str(uniq_id) + '.' + self.default_ext
else:
filename = uniq_id
filename = str(uniq_id)
if self.storage_mode == 'filepath':
filename = urlparse(url).path
logging.info('Processing %s as %s' % (url, filename))
Expand Down Expand Up @@ -770,7 +804,7 @@ def estimate(self, mode):
else:
start_page_data = self.http.get(url + '?' + '&'.join(s)).json()
else:
logging.info('Start request params: %s headers: %s' %(str(params), str(headers)))
logging.debug('Start request params: %s headers: %s' %(str(params), str(headers)))
if headers and len(headers.keys()) > 0:
if params and len(params.keys()) > 0:
response = self.http.get(url, params=params, headers=headers)
Expand Down
2 changes: 0 additions & 2 deletions apibackuper/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# coding: utf-8
import lxml.etree as etree
from collections import defaultdict
import logging

def etree_to_dict(t, prefix_strip=True):
tag = t.tag if not prefix_strip else t.tag.rsplit('}', 1)[-1]
Expand All @@ -10,7 +9,6 @@ def etree_to_dict(t, prefix_strip=True):
if children:
dd = defaultdict(list)
for dc in map(etree_to_dict, children):
# print(dir(dc))
for k, v in dc.items():
if prefix_strip:
k = k.rsplit('}', 1)[-1]
Expand Down
8 changes: 8 additions & 0 deletions apibackuper/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
DEFAULT_OPTIONS = {'initialized' : False}
DEFAULT_DELAY = 0.5
RETRY_DELAY = 5
FIELD_SPLITTER = '.'
DEFAULT_RETRY_COUNT = 5

FILE_SIZE_DOWNLOAD_LIMIT = 270000000
DEFAULT_TIMEOUT = 10
PARAM_SPLITTER = ';'

DEFAULT_ERROR_STATUS_CODES = [500, 503]
2 changes: 2 additions & 0 deletions apibackuper/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import click
import logging
from pprint import pprint
import urllib3
urllib3.disable_warnings()

from .cmds.project import ProjectBuilder

Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
pymongo
lxml
click

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def run_tests(self):


install_requires = [
'bson',
'bson', 'click', 'lxml',
]


Expand Down

0 comments on commit 940c47b

Please sign in to comment.