-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executemany has an option for 'bulk'-insert on Impala #96
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
import time | ||
import getpass | ||
|
||
from impala.dbapi.interface import Connection, Cursor, _bind_parameters | ||
from impala.dbapi.interface import Connection, Cursor, _bind_parameters, RE_INSERT_VALUES | ||
from impala._rpc import hiveserver2 as rpc | ||
from impala.error import NotSupportedError, OperationalError, ProgrammingError | ||
from impala._thrift_api.hiveserver2 import TProtocolVersion | ||
|
@@ -201,13 +201,49 @@ def _get_sleep_interval(self, start_time): | |
return 0.5 | ||
return 1.0 | ||
|
||
def executemany(self, operation, seq_of_parameters): | ||
def _exec_single_query_insert(self, operation, seq_of_parameters): | ||
""" | ||
Builds and executes one big INSERT...VALUES statement instead of sending | ||
sequency of one-row inserts. | ||
Assumes that `operation` is INSERT...VALUES statement. | ||
Returns `True` if operation was executed and `False` otherwise. | ||
|
||
INSERT...VALUES is available in Impala and multiple row inserts are | ||
supported. | ||
""" | ||
match = RE_INSERT_VALUES.match(operation) | ||
if match and seq_of_parameters and len(seq_of_parameters) > 1: | ||
# Split "INSERT...VALUES (...)" query into | ||
query_left = match.group(1) # "INSERT...VALUES" | ||
values = match.group(2) # "(...)" | ||
|
||
def op(): | ||
# Bind each row's parameters to `values` [ "(...)" ] | ||
bound_params = map( | ||
lambda params: _bind_parameters(values, params), | ||
seq_of_parameters) | ||
# Build "big" query: "INSERT...VALUES (...), (...), (...), ..." | ||
self._last_operation_string = query_left + ", ".join(bound_params) | ||
self._last_operation_handle = rpc.execute_statement( | ||
self.service, self.session_handle, self._last_operation_string, | ||
None) | ||
|
||
self._execute_sync(op) | ||
return True | ||
return False | ||
|
||
def executemany(self, operation, seq_of_parameters, single_query_insert=False): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps change the name of the new flag to |
||
# PEP 249 | ||
for parameters in seq_of_parameters: | ||
self.execute(operation, parameters) | ||
if self.has_result_set: | ||
raise ProgrammingError("Operations that have result sets are " | ||
"not allowed with executemany.") | ||
|
||
# If condition above holds, we either don't want to build single insert | ||
# query or were not able to build it. | ||
if not single_query_insert or \ | ||
not self._exec_single_query_insert(operation, seq_of_parameters): | ||
for parameters in seq_of_parameters: | ||
self.execute(operation, parameters) | ||
if self.has_result_set: | ||
raise ProgrammingError("Operations that have result sets are " | ||
"not allowed with executemany.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find the logic here a big confusing. I'd prefer something more like single_query = None
if rewrite_inserts_single_query:
single_query = self. _exec_single_query_insert(...)
if single_query:
self.execute(single_query)
else:
for parameters in seq_of_parameters:
... |
||
|
||
def fetchone(self): | ||
# PEP 249 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,8 @@ | |
IntegrityError, DataError, NotSupportedError) | ||
|
||
|
||
RE_INSERT_VALUES = re.compile(r"(.*\binsert\b.*\bvalues\b\s*)(\(.*\))\s*;?\s*", flags=re.IGNORECASE) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: insert addl line per PEP8 |
||
class Connection(object): | ||
# PEP 249 | ||
# Connection objects are associated with a TCLIService.Client thrift | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of
_exec_single_query_insert()
, could you have a method called_rewrite_inserts_single_query()
or something like that, which takes inoperation
andseq_of_parameters
and returns the combined single SQL query string?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And returns
None
if it fails.