Skip to content

Commit

Permalink
made function more robust per comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-kovalsky committed Jan 8, 2025
1 parent b468e00 commit 7d5c66c
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 11 deletions.
14 changes: 11 additions & 3 deletions src/sempy_labs/_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,16 +1188,24 @@ def generate_guid():
return str(uuid.uuid4())


def _get_column_value(lakehouse: str, table_name: str, column_name: str = 'RunId', function: str = 'MAX') -> int:
def _get_column_aggregate(
lakehouse: str,
table_name: str,
column_name: str = "RunId",
function: str = "max",
default_value: int = 0,
) -> int:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
function = function.upper()
query = f"SELECT {function}({column_name}) FROM {lakehouse}.{table_name}"
if "COUNT" in function and "DISTINCT" in function:
query = f"SELECT COUNT(DISTINCT({column_name})) FROM {lakehouse}.{table_name}"
dfSpark = spark.sql(query)
max_run_id = dfSpark.collect()[0][0] or 0

return max_run_id
return dfSpark.collect()[0][0] or default_value


def _make_list_unique(my_list):
Expand Down
4 changes: 2 additions & 2 deletions src/sempy_labs/_model_bpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
resolve_workspace_capacity,
resolve_dataset_name_and_id,
get_language_codes,
_get_column_value,
_get_column_aggregate,
resolve_workspace_name_and_id,
)
from sempy_labs.lakehouse import get_lakehouse_tables, lakehouse_attached
Expand Down Expand Up @@ -383,7 +383,7 @@ def translate_using_spark(rule_file):
if len(lakeT_filt) == 0:
runId = 1
else:
max_run_id = _get_column_value(
max_run_id = _get_column_aggregate(
lakehouse=lakehouse, table_name=delta_table_name
)
runId = max_run_id + 1
Expand Down
4 changes: 2 additions & 2 deletions src/sempy_labs/_model_bpa_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
save_as_delta_table,
resolve_workspace_capacity,
retry,
_get_column_value,
_get_column_aggregate,
)
from sempy_labs.lakehouse import (
get_lakehouse_tables,
Expand Down Expand Up @@ -76,7 +76,7 @@ def run_model_bpa_bulk(
if len(lakeT_filt) == 0:
runId = 1
else:
max_run_id = _get_column_value(lakehouse=lakehouse, table_name=output_table)
max_run_id = _get_column_aggregate(lakehouse=lakehouse, table_name=output_table)
runId = max_run_id + 1

if isinstance(workspace, str):
Expand Down
6 changes: 4 additions & 2 deletions src/sempy_labs/_vertipaq.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
resolve_lakehouse_name,
save_as_delta_table,
resolve_workspace_capacity,
_get_column_value,
_get_column_aggregate,
resolve_workspace_name_and_id,
resolve_dataset_name_and_id,
)
Expand Down Expand Up @@ -519,7 +519,9 @@ def _style_columns_based_on_types(dataframe: pd.DataFrame, column_type_mapping):
if len(lakeT_filt) == 0:
runId = 1
else:
max_run_id = _get_column_value(lakehouse=lakehouse, table_name=lakeTName)
max_run_id = _get_column_aggregate(
lakehouse=lakehouse, table_name=lakeTName
)
runId = max_run_id + 1

dfMap = {
Expand Down
4 changes: 2 additions & 2 deletions src/sempy_labs/report/_report_bpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
resolve_report_id,
resolve_lakehouse_name,
resolve_workspace_capacity,
_get_column_value,
_get_column_aggregate,
resolve_workspace_name_and_id,
)
from sempy_labs.lakehouse import get_lakehouse_tables, lakehouse_attached
Expand Down Expand Up @@ -217,7 +217,7 @@ def execute_rule(row):
if len(lakeT_filt) == 0:
runId = 1
else:
max_run_id = _get_column_value(
max_run_id = _get_column_aggregate(
lakehouse=lakehouse, table_name=delta_table_name
)
runId = max_run_id + 1
Expand Down

0 comments on commit 7d5c66c

Please sign in to comment.