Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executemany has an option for 'bulk'-insert on Impala #96

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions impala/dbapi/hiveserver2.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import time
import getpass

from impala.dbapi.interface import Connection, Cursor, _bind_parameters
from impala.dbapi.interface import Connection, Cursor, _bind_parameters, RE_INSERT_VALUES
from impala._rpc import hiveserver2 as rpc
from impala.error import NotSupportedError, OperationalError, ProgrammingError
from impala._thrift_api.hiveserver2 import TProtocolVersion
Expand Down Expand Up @@ -201,13 +201,49 @@ def _get_sleep_interval(self, start_time):
return 0.5
return 1.0

def executemany(self, operation, seq_of_parameters):
def _exec_single_query_insert(self, operation, seq_of_parameters):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of _exec_single_query_insert(), could you have a method called _rewrite_inserts_single_query() or something like that, which takes in operation and seq_of_parameters and returns the combined single SQL query string?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And returns None if it fails.

"""
Builds and executes one big INSERT...VALUES statement instead of sending
sequency of one-row inserts.
Assumes that `operation` is INSERT...VALUES statement.
Returns `True` if operation was executed and `False` otherwise.

INSERT...VALUES is available in Impala and multiple row inserts are
supported.
"""
match = RE_INSERT_VALUES.match(operation)
if match and seq_of_parameters and len(seq_of_parameters) > 1:
# Split "INSERT...VALUES (...)" query into
query_left = match.group(1) # "INSERT...VALUES"
values = match.group(2) # "(...)"

def op():
# Bind each row's parameters to `values` [ "(...)" ]
bound_params = map(
lambda params: _bind_parameters(values, params),
seq_of_parameters)
# Build "big" query: "INSERT...VALUES (...), (...), (...), ..."
self._last_operation_string = query_left + ", ".join(bound_params)
self._last_operation_handle = rpc.execute_statement(
self.service, self.session_handle, self._last_operation_string,
None)

self._execute_sync(op)
return True
return False

def executemany(self, operation, seq_of_parameters, single_query_insert=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps change the name of the new flag to rewrite_inserts_single_query, or something like that? I think it would be more clear as to exactly what's happening.

# PEP 249
for parameters in seq_of_parameters:
self.execute(operation, parameters)
if self.has_result_set:
raise ProgrammingError("Operations that have result sets are "
"not allowed with executemany.")

# If condition above holds, we either don't want to build single insert
# query or were not able to build it.
if not single_query_insert or \
not self._exec_single_query_insert(operation, seq_of_parameters):
for parameters in seq_of_parameters:
self.execute(operation, parameters)
if self.has_result_set:
raise ProgrammingError("Operations that have result sets are "
"not allowed with executemany.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the logic here a big confusing. I'd prefer something more like

single_query = None
if rewrite_inserts_single_query:
    single_query = self. _exec_single_query_insert(...)
if single_query:
    self.execute(single_query)
else:
    for parameters in seq_of_parameters:
        ...


def fetchone(self):
# PEP 249
Expand Down
2 changes: 2 additions & 0 deletions impala/dbapi/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
IntegrityError, DataError, NotSupportedError)


RE_INSERT_VALUES = re.compile(r"(.*\binsert\b.*\bvalues\b\s*)(\(.*\))\s*;?\s*", flags=re.IGNORECASE)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: insert addl line per PEP8

class Connection(object):
# PEP 249
# Connection objects are associated with a TCLIService.Client thrift
Expand Down
56 changes: 56 additions & 0 deletions impala/tests/_dbapi20_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,62 @@ def _paraminsert(self,cur):
'incorrectly'
)

def _get_number_of_files(self, cur, table_name):
cur.execute("SHOW TABLE STATS %s" % table_name)
col_names = [d[0] for d in cur.description]
num_files_idx = col_names.index("#Files")
num_files = cur.fetchone()[num_files_idx]
return num_files

def test_executemany_single_query(self):
con = self._connect()
try:
cur = con.cursor()
self.executeDDL1(cur)
table_name = "%sbooze" % self.table_prefix
num_files_before = self._get_number_of_files(cur, table_name)

largs = [ ("Cooper's",) , ("Boag's",) ]
margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ]
if self.driver.paramstyle == 'qmark':
cur.executemany(
'insert into %sbooze values (?)' % self.table_prefix,
largs, single_query_insert=True
)
elif self.driver.paramstyle == 'numeric':
cur.executemany(
'insert into %sbooze values (:1)' % self.table_prefix,
largs, single_query_insert=True
)
elif self.driver.paramstyle == 'named':
cur.executemany(
'insert into %sbooze values (:beer)' % self.table_prefix,
margs, single_query_insert=True
)
elif self.driver.paramstyle == 'format':
cur.executemany(
'insert into %sbooze values (%%s)' % self.table_prefix,
largs, single_query_insert=True
)
elif self.driver.paramstyle == 'pyformat':
cur.executemany(
'insert into %sbooze values (%%(beer)s)' % (
self.table_prefix
),
margs, single_query_insert=True
)
else:
self.fail('Unknown paramstyle')

num_files_after = self._get_number_of_files(cur, table_name)
self.assertEqual(num_files_after, num_files_before + 1,
'cursor.executemany(..., single_query_insert=True) should '
'produce only one file (produced %d)' %
(num_files_after - num_files_before))

finally:
con.close()

def test_executemany(self):
con = self._connect()
try:
Expand Down