Skip to content

Commit

Permalink
fixup! [IMP] util.explode_query_range: replace problematic `parallel_…
Browse files Browse the repository at this point in the history
…filter` formatting
  • Loading branch information
Pirols committed Sep 26, 2024
1 parent ed1dee3 commit 8bbcc71
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/base/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ def test_explode_format_parallel_filter(self):
q1,
table="res_users",
bucket_size=1,
format=False,
)[0]
self.assertEqual(out1, expected_out)

Expand All @@ -682,6 +683,7 @@ def test_explode_format_parallel_filter(self):
q2,
table="res_users",
bucket_size=1,
format=False,
)[0]
self.assertEqual(out2, expected_out)

Expand Down
23 changes: 18 additions & 5 deletions src/util/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def explode_query(cr, query, alias=None, num_buckets=8, prefix=None):
return [cr.mogrify(query, [num_buckets, index]).decode() for index in range(num_buckets)]


def explode_query_range(cr, query, table, alias=None, bucket_size=10000, prefix=None):
def explode_query_range(cr, query, table, alias=None, bucket_size=10000, prefix=None, format=True):
"""
Explode a query to multiple queries that can be executed in parallel.
Expand Down Expand Up @@ -297,16 +297,27 @@ def explode_query_range(cr, query, table, alias=None, bucket_size=10000, prefix=
# Still, since the query may only be valid if there is no split, we force the usage of `prefix` in the query to
# validate its correctness and avoid scripts that pass the CI but fail in production.
parallel_filter = "{alias}.id IS NOT NULL".format(alias=alias)
return [query.replace("{parallel_filter}", parallel_filter)]
return [
(
query.format(parallel_filter=parallel_filter)
if format
else query.replace("{parallel_filter}", parallel_filter)
)
]

parallel_filter = "{alias}.id BETWEEN %(lower-bound)s AND %(upper-bound)s".format(alias=alias)
query = query.replace("%", "%%").replace("{parallel_filter}", parallel_filter)

query = query.replace("%", "%%")
query = (
query.format(parallel_filter=parallel_filter) if format else query.replace("{parallel_filter}", parallel_filter)
)

return [
cr.mogrify(query, {"lower-bound": ids[i], "upper-bound": ids[i + 1] - 1}).decode() for i in range(len(ids) - 1)
]


def explode_execute(cr, query, table, alias=None, bucket_size=10000, logger=_logger):
def explode_execute(cr, query, table, alias=None, bucket_size=10000, format=True, logger=_logger):
"""
Execute a query in parallel.
Expand Down Expand Up @@ -336,6 +347,8 @@ def explode_execute(cr, query, table, alias=None, bucket_size=10000, logger=_log
:param str table: name of the *main* table of the query, used to split the processing
:param str alias: alias used for the main table in the query
:param int bucket_size: size of the buckets of ids to split the processing
:param bool format: Whether to use `.format` (instead of `.replace`) to inject the parallel filter.
Setting it to `False` can prevent issues with hard-coded curly braces.
:param logger: logger used to report the progress
:type logger: :class:`logging.Logger`
:return: the sum of `cr.rowcount` for each query run
Expand All @@ -349,7 +362,7 @@ def explode_execute(cr, query, table, alias=None, bucket_size=10000, logger=_log
"""
return parallel_execute(
cr,
explode_query_range(cr, query, table, alias=alias, bucket_size=bucket_size),
explode_query_range(cr, query, table, alias=alias, bucket_size=bucket_size, format=format),
logger=logger,
)

Expand Down

0 comments on commit 8bbcc71

Please sign in to comment.