Skip to content

Commit

Permalink
Finalized worker and added post-processing
Browse files Browse the repository at this point in the history
Switcheroo of the file- and class name Worker/Job.
File name goes from Worker -> Job
Class goes from Job -> Worker
It makes more sense to have a "worker" do a "job", so that's why. It also allowed me to extend the new Post_Processor class in Worker.

Added post-processing with the Post_Processor class. These are functions run after the Excel has been inserted. Properties are defined in mysql_config.json

Finalized insert_all.py
  • Loading branch information
VictorWesterlund committed May 17, 2021
1 parent 39804c2 commit 31ba562
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 74 deletions.
13 changes: 11 additions & 2 deletions classes/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,17 @@ def create_config():
"mysql_passwd": "",
"mysql_db": ""
},
"create_tables": True, # Attempt to create missing tables in DB
"chunk_size": 100 # Number of rows to insert into db at a time
"rebuild_tables": True, # Rebuild tables and create if missing
"chunk_size": 100, # Number of rows to insert into db at a time
# Additional work after insertion is done
"post_processing": {
"change": {}, # Object with key,value pairs of columns to type cast
# Create indices (will run after "change")
"index": {
"primary": None, # Primary key (post-change),
"columns": [] # Array of columns to index
}
}
}

# Create config file
Expand Down
78 changes: 32 additions & 46 deletions classes/Database.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,15 @@
from MySQLdb import _mysql as MySQL_Connector
from .Database_MySQL import MySQL

class MySQL():
def __init__(self,host,user,passwd,db):
self.mysql = MySQL_Connector.connect(host=host,user=user,passwd=passwd,db=db)
self.db = db
self.engine = "InnoDB"

self._table = None

@property
def table(self):
return self._table

@table.setter
def table(self,name):
self._table = f"`{self.db}`.`{name}`"

# Run and return SQL query
def query(self,sql,maxrows = 0):
try:
self.mysql.query(sql)
result = self.mysql.store_result()
return result.fetch_row(maxrows=maxrows)
except Exception as e:
return e

# Return true if an SQL query returned a match
def truthy(self,result):
if(isinstance(result,Exception) or len(result) < 1):
return False
return True

def escape(self,string):
string = self.mysql.escape_string(string)
string = string.decode("utf-8")
return string
coercive_datatype = {
"TRUE": "1",
"FALSE": "0",
"NAT": "NULL",
"NAN": "NULL"
}

# Contains bulk variants of methods defined in Database class
# These methods follow an arbitrary protocol for parsing data
class BulkTools():
class Database_BulkTools():
# Append multiple columns at once
# Protocol:
# - Expects a list of string key, value pairs
Expand All @@ -55,7 +26,7 @@ def append_columns(self,columns):
if(column_type == "VARCHAR"):
column_type = f"VARCHAR({self.default_length})"

queries.append(f"ADD `{column_name}` {column_type} NOT NULL")
queries.append(f"ADD `{column_name}` {column_type}")

# Concat all queries and execute
queries = ",".join(queries)
Expand Down Expand Up @@ -84,6 +55,11 @@ def wrap_column(column):
value = str(value)
value = self.escape(value)

# Translate data types to value
if(value.upper() in coercive_datatype):
values.append(coercive_datatype[value.upper()])
continue

# Add the value for this row as a string ''
values.append(f"'{value}'")

Expand All @@ -95,7 +71,7 @@ def wrap_column(column):
sql = f"INSERT INTO {self.table} ({columns_sql}) VALUES {values};"
return self.bquery(sql)

class Database(MySQL,BulkTools):
class Database(MySQL,Database_BulkTools):
def __init__(self,host,user,passwd,db):
super(Database,self).__init__(host,user,passwd,db)
self.default_length = 256
Expand Down Expand Up @@ -141,7 +117,7 @@ def create_table(self):
if(self.table_exists()):
return False

sql = f"CREATE TABLE {self.table} (`{self.placeholder}` INT NULL) CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_unicode_ci' ENGINE = {self.engine};"
sql = f"CREATE TABLE {self.table} (`{self.placeholder}` INT NULL) CHARACTER SET {self.charset} COLLATE {self.collate} ENGINE = {self.engine};"
return self.bquery(sql)

# Append new column
Expand All @@ -160,23 +136,33 @@ def append_column(self,name,data_type = "int",length = -1):
def make_unique(self,column,update = False):
if(update):
# Update existing primary key
sql = f"ALTER TABLE {self.table} DROP PRIMARY KEY, ADD PRIMARY KEY(`{column}`)"
sql = f"ALTER TABLE {self.table} DROP PRIMARY KEY, ADD PRIMARY KEY(`{column}`);"
return self.bquery(sql)
else:
# Add new primary key
sql = f"ALTER TABLE {self.table} ADD PRIMARY KEY(`{column}`)"
if(not self.bquery(sql)):
sql = f"ALTER TABLE {self.table} ADD PRIMARY KEY(`{column}`);"
result = self.bquery(sql)
if(not result):
# Attempt to update existing if query fails
self.make_unique(column,True)
return result

def make_index(self,column):
sql = f"ALTER TABLE {self.table} ADD INDEX(`{column}`);"
return self.bquery(sql)

def change(self,column,datatype):
sql = f"ALTER TABLE {self.table} CHANGE `{column}` `{column}` {datatype} CHARACTER SET {self.charset} COLLATE {self.collate} NULL DEFAULT NULL;"
return self.bquery(sql)

def drop_column(self,column):
sql = f"ALTER TABLE {self.table} DROP `{column}`";
sql = f"ALTER TABLE {self.table} DROP `{column}`;";
return self.bquery(sql)

def truncate(self):
sql = f"TRUNCATE {self.table}"
sql = f"TRUNCATE TABLE {self.table};"
return self.bquery(sql)

def drop(self):
sql = f"DROP {self.table}"
sql = f"DROP TABLE {self.table};"
return self.bquery(sql)
40 changes: 40 additions & 0 deletions classes/Database_MySQL.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from MySQLdb import _mysql as MySQL_Connector

class MySQL():
def __init__(self,host,user,passwd,db):
self.mysql = MySQL_Connector.connect(host=host,user=user,passwd=passwd,db=db,charset="utf8")
self.db = db

self.engine = "InnoDB"
self.charset = "utf8mb4"
self.collate = "utf8mb4_unicode_ci"

self._table = None

@property
def table(self):
return self._table

@table.setter
def table(self,name):
self._table = f"`{self.db}`.`{name}`"

# Run and return SQL query
def query(self,sql,maxrows = 0):
try:
self.mysql.query(sql)
result = self.mysql.store_result()
return result.fetch_row(maxrows=maxrows)
except Exception as e:
return e

# Return true if an SQL query returned a match
def truthy(self,result):
if(isinstance(result,Exception) or len(result) < 1):
return False
return True

def escape(self,string):
string = self.mysql.escape_string(string)
string = string.decode("utf-8")
return string
69 changes: 47 additions & 22 deletions classes/Worker.py → classes/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,51 @@

# Data type translation lookup table
datatypes = {
"str": "TEXT",
"NaTType": "VARCHAR",
"int": "INT",
"int64": "INT",
"float": "FLOAT",
"float64": "DOUBLE",
"bool_": "BOOLEAN",
"Timestamp": "DATE",
"object": "TEXT",
"int": "VARCHAR",
"int64": "VARCHAR",
"float": "VARCHAR",
"float64": "VARCHAR",
"bool": "BOOLEAN",
"datetime64[ns]": "DATE",
}

class Job():
class Post_Processing():
def __init__(self,config):
self.config = config["post_processing"]

# Type cast columns (SQL CHANGE)
def change(self):
change = self.config["change"]

if(len(change.keys()) > 0):
for column,datatype in change.items():
self.db.change(column,datatype)

def primary_key(self):
column = self.config["index"]["primary"]
if(not column):
return False

return self.db.make_unique(column)

def index(self):
columns = self.config["index"]["columns"]

if(len(columns) < 1):
return False

for column in columns:
self.db.make_index(column)

class Worker(Post_Processing):
def __init__(self,db,excel):
self.db = db
self.data = excel.dataframe
self.columns = excel.get_headers()

self._chunk_size = 100

# Use first column as primary key
primary = self.columns[0]
self.db.append_column(primary)
self.db.make_unique(primary)

@property
def chunk_size(self):
return self._chunk_size
Expand All @@ -33,21 +55,23 @@ def chunk_size(self):
def chunk_size(self,size):
self._chunk_size = size

def truncate(self):
print(f"Table {self.db.table} is about to get wiped (truncated) for data-entry. Interrupt within 10 seconds to abort.")
time.sleep(10)
self.db.truncate()
# Post processing invoker and sequencer
def post_processing(self,config):
super(Worker,self).__init__(config)
self.change()
self.primary_key()
self.index()

# Create database columns from Excel headers
def create_columns(self):
columns = {}
for column in self.columns:
# Let first item for Excel header determine the column data type
datatype = type(self.data[column][0]).__name__
datatype = str(self.data[column].dtype)

# Treat unknown data types as strings
# Treat unknown data types as pandas object (string)
if(datatype not in datatypes):
datatype = "str"
datatype = "object"

# Translate Python (panda) types to SQL types
columns[column] = datatypes[datatype]
Expand All @@ -67,7 +91,8 @@ def insert_rows(self):

values = []
for column in self.columns:
values.append(self.data[column][index])
value = self.data[column][index]
values.append(value)
rows.append(values)

# Insert remaining rows
Expand Down
13 changes: 9 additions & 4 deletions insert_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from classes.Excel import Excel
from classes.Database import Database
from classes.Config import get_config
from classes.Worker import Job
from classes.Job import Worker

excel_dir = "sheets/"

Expand All @@ -21,8 +21,13 @@
# Use the extensionless file name as table name in the selected db
db.table = excel_name

if(db_config["create_tables"]):
# Drop and re-create table
if(db_config["rebuild_tables"]):
db.drop()
db.create_table()

worker = Job(db,excel)
worker.run()
job = Worker(db,excel)
job.run()

# Perform additional work defined in config
job.post_processing(db_config)

0 comments on commit 31ba562

Please sign in to comment.