Skip to content

Commit

Permalink
Update dumpit.py
Browse files Browse the repository at this point in the history
- Check for file size after the dump and remove the file if it was
  empty
- Tidy up handling of file names and paths
- Code formatting
  • Loading branch information
m-appel committed Jan 24, 2025
1 parent 6fad033 commit c897648
Showing 1 changed file with 46 additions and 33 deletions.
79 changes: 46 additions & 33 deletions dumpit.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import argparse
import arrow
import json
import logging
import os

import arrow


class Dumper():
def __init__(self, config_fname):
""" Initialize crawler with variables from config file"""
Expand All @@ -14,8 +16,8 @@ def __init__(self, config_fname):
def fname(self, startdate):
"""Construct the folder and filename for the given date and config file"""

dump_folder = f"{self.config['dump_root']}/{startdate.year:04d}/{startdate.month:02d}/{startdate.day:02d}/"
dump_fname = self.config['dump_fname']+'_'+startdate.format("YYYY-MM-DD")+'.csv'
dump_folder = os.path.join(self.config['dump_root'], startdate.format('YYYY/MM/DD'))
dump_fname = f'{self.config["dump_fname"]}_{startdate.format("YYYY-MM-DD")}.csv'

return dump_folder, dump_fname

Expand All @@ -26,64 +28,76 @@ def dump(self, date, compress='lz4'):

query = self.config['query'].format(startdate=startdate, enddate=enddate)
dump_folder, dump_fname = self.fname(startdate)
intermediate_output_file = os.path.join(dump_folder, dump_fname)
final_output_file = intermediate_output_file
if compress:
final_output_file += f'.{compress}'

# Check if dump already exists
if(os.path.exists(dump_folder+dump_fname+'.lz4')
and os.path.getsize(dump_folder+dump_fname+'.lz4') > 1000):
logging.error(f'{dump_folder}{dump_fname}.lz4 already exists')
if os.path.exists(final_output_file):
logging.error(f'{final_output_file} already exists')
return

# create directories if needed
os.makedirs(dump_folder, exist_ok=True)
os.makedirs(dump_folder, exist_ok=True)

cmd = r"""psql -d {db} -h {psql_host} -U {psql_role} -c "\copy ({query}) to '{fname}' csv header;" """.format(
db=self.config['database'],
psql_host=PSQL_HOST,
psql_role=PSQL_ROLE,
query=query,
fname=dump_folder+dump_fname
)
fname=intermediate_output_file
)

logging.debug(f'Dumping data to csv file ({cmd})...')
ret_value = os.system( cmd )
ret_value = os.system(cmd)
if ret_value != 0:
logging.error(f'Could not dump data? Returned value: {ret_value}')

if compress:
cmd = f'{compress} -f {dump_folder}{dump_fname} {dump_folder}{dump_fname}.{compress}'
cmd = f'{compress} -f {intermediate_output_file} {final_output_file}'
logging.debug(f'Compressing data ({cmd})...')
ret_value = os.system( cmd )
os.remove(dump_folder+dump_fname)
ret_value = os.system(cmd)
os.remove(intermediate_output_file)

if ret_value != 0:
logging.error(f'Could not compress data? Returned value: {ret_value}')

if not os.path.exists(final_output_file):
logging.error(f'No output file created: {final_output_file}')
return

if os.path.getsize(final_output_file) < 1000:
logging.warning(f'Output file was empty. Deleting {final_output_file}')
os.remove(final_output_file)


if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s %(processName)s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[logging.StreamHandler()])
format='%(asctime)s %(processName)s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[logging.StreamHandler()])

global PSQL_HOST
PSQL_HOST = os.environ["PSQL_HOST"]
global PSQL_ROLE
PSQL_ROLE = os.environ["PSQL_ROLE"]

parser = argparse.ArgumentParser(
description='Dump data from the database to a CSV file')
parser.add_argument('--config', type=str,
help='configuration file with query and file structure details')
parser.add_argument('--dates', default='', type=str,
help='file containing a list of dates to dump (one date per line)')
parser.add_argument('--date', default='', type=str,
help='date to dump (e.g. 2022-01-20)')
parser.add_argument('--startdate', default='', type=str,
help='start date for a range of dates. Should also specify enddate')
parser.add_argument('--enddate', default='', type=str,
help='end date for a range of dates. Should also specify startdate')
parser.add_argument('--frequency', default='day', type=str,
help='frequency for a range of dates (default: day)')
description='Dump data from the database to a CSV file')
parser.add_argument('--config', type=str,
help='configuration file with query and file structure details')
parser.add_argument('--dates', default='', type=str,
help='file containing a list of dates to dump (one date per line)')
parser.add_argument('--date', default='', type=str,
help='date to dump (e.g. 2022-01-20)')
parser.add_argument('--startdate', default='', type=str,
help='start date for a range of dates. Should also specify enddate')
parser.add_argument('--enddate', default='', type=str,
help='end date for a range of dates. Should also specify startdate')
parser.add_argument('--frequency', default='day', type=str,
help='frequency for a range of dates (default: day)')

args = parser.parse_args()

Expand Down Expand Up @@ -111,4 +125,3 @@ def dump(self, date, compress='lz4'):
# Log any error that could happen
except Exception as e:
logging.error('Error', exc_info=e)

0 comments on commit c897648

Please sign in to comment.